From 63fd11e118325c6266bd442b41ba4239375ec502 Mon Sep 17 00:00:00 2001 From: MuK IT GmbH Date: Wed, 10 Jul 2019 10:28:27 +0000 Subject: [PATCH] publish muk_security - 11.0 --- muk_security/__init__.py | 2 +- muk_security/__manifest__.py | 4 +- muk_security/models/access.py | 272 +++++------ muk_security/models/ir_rule.py | 78 +-- muk_security/models/locking.py | 502 ++++++++++---------- muk_security/models/res_groups.py | 2 +- muk_security/models/res_users.py | 2 +- muk_security/tests/__init__.py | 2 +- muk_security/tests/test_suspend_security.py | 131 +++-- 9 files changed, 513 insertions(+), 482 deletions(-) diff --git a/muk_security/__init__.py b/muk_security/__init__.py index ba31b8c..31964ee 100644 --- a/muk_security/__init__.py +++ b/muk_security/__init__.py @@ -20,4 +20,4 @@ from . import models def _patch_system(): - from . import base \ No newline at end of file + from . import base diff --git a/muk_security/__manifest__.py b/muk_security/__manifest__.py index c2c9345..661e6a2 100644 --- a/muk_security/__manifest__.py +++ b/muk_security/__manifest__.py @@ -20,7 +20,7 @@ { "name": "MuK Security", "summary": """Security Features""", - "version": "11.0.1.1.7", + "version": "11.0.1.1.8", "category": "Extra Tools", "license": "AGPL-3", "website": "http://www.mukit.at", @@ -54,4 +54,4 @@ "application": False, "installable": True, "post_load": "_patch_system", -} \ No newline at end of file +} diff --git a/muk_security/models/access.py b/muk_security/models/access.py index 192324a..561986b 100644 --- a/muk_security/models/access.py +++ b/muk_security/models/access.py @@ -1,136 +1,136 @@ -################################################################################### -# -# Copyright (C) 2017 MuK IT GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -################################################################################### - -import logging - -from odoo import _ -from odoo import models, api, fields -from odoo.exceptions import AccessError - -_logger = logging.getLogger(__name__) - -class BaseModelAccess(models.AbstractModel): - - _name = 'muk_security.access' - _description = "MuK Access Model" - _inherit = 'muk_utils.model' - - #---------------------------------------------------------- - # Database - #---------------------------------------------------------- - - permission_read = fields.Boolean( - compute='_compute_permissions', - search='_search_permission_read', - string="Read Access") - - permission_create = fields.Boolean( - compute='_compute_permissions', - search='_search_permission_create', - string="Create Access") - - permission_write = fields.Boolean( - compute='_compute_permissions', - search='_search_permission_write', - string="Write Access") - - permission_unlink = fields.Boolean( - compute='_compute_permissions', - search='_search_permission_unlink', - string="Delete Access") - - #---------------------------------------------------------- - # Function - #---------------------------------------------------------- - - @api.model - def check_access_rights(self, operation, raise_exception=True): - return super(BaseModelAccess, self).check_access_rights(operation, raise_exception) - - @api.multi - def check_access_rule(self, operation): - return super(BaseModelAccess, self).check_access_rule(operation) - - @api.model - def _apply_ir_rules(self, query, mode='read'): - return super(BaseModelAccess, self)._apply_ir_rules(query, mode) - - @api.model - def check_field_access_rights(self, operation, fields): - return super(BaseModelAccess, self).check_field_access_rights(operation, fields) - - @api.multi - def check_access(self, operation, raise_exception=False): - try: - access_right = self.check_access_rights(operation, raise_exception) - access_rule = self.check_access_rule(operation) == None - access = access_right and access_rule - if not access and raise_exception: - raise AccessError(_("This operation is forbidden!")) - return access - except AccessError: - if raise_exception: - raise AccessError(_("This operation is forbidden!")) - return False - - #---------------------------------------------------------- - # Search - #---------------------------------------------------------- - - @api.model - def _search_permission_read(self, operator, operand): - records = self.search([]).filtered(lambda r: r.check_access('read') == True) - if operator == '=' and operand: - return [('id', 'in', records.mapped('id'))] - return [('id', 'not in', records.mapped('id'))] - - @api.model - def _search_permission_create(self, operator, operand): - records = self.search([]).filtered(lambda r: r.check_access('create') == True) - if operator == '=' and operand: - return [('id', 'in', records.mapped('id'))] - return [('id', 'not in', records.mapped('id'))] - - @api.model - def _search_permission_write(self, operator, operand): - records = self.search([]).filtered(lambda r: r.check_access('write') == True) - if operator == '=' and operand: - return [('id', 'in', records.mapped('id'))] - return [('id', 'not in', records.mapped('id'))] - - @api.model - def _search_permission_unlink(self, operator, operand): - records = self.search([]).filtered(lambda r: r.check_access('unlink') == True) - if operator == '=' and operand: - return [('id', 'in', records.mapped('id'))] - return [('id', 'not in', records.mapped('id'))] - - #---------------------------------------------------------- - # Read, View - #---------------------------------------------------------- - - @api.multi - def _compute_permissions(self): - for record in self: - record.update({ - 'permission_read': record.check_access('read'), - 'permission_create': record.check_access('create'), - 'permission_write': record.check_access('write'), - 'permission_unlink': record.check_access('unlink'), - }) \ No newline at end of file +################################################################################### +# +# Copyright (C) 2017 MuK IT GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +################################################################################### + +import logging + +from odoo import _ +from odoo import models, api, fields +from odoo.exceptions import AccessError + +_logger = logging.getLogger(__name__) + +class BaseModelAccess(models.AbstractModel): + + _name = 'muk_security.access' + _description = "MuK Access Model" + _inherit = 'muk_utils.model' + + #---------------------------------------------------------- + # Database + #---------------------------------------------------------- + + permission_read = fields.Boolean( + compute='_compute_permissions', + search='_search_permission_read', + string="Read Access") + + permission_create = fields.Boolean( + compute='_compute_permissions', + search='_search_permission_create', + string="Create Access") + + permission_write = fields.Boolean( + compute='_compute_permissions', + search='_search_permission_write', + string="Write Access") + + permission_unlink = fields.Boolean( + compute='_compute_permissions', + search='_search_permission_unlink', + string="Delete Access") + + #---------------------------------------------------------- + # Function + #---------------------------------------------------------- + + @api.model + def check_access_rights(self, operation, raise_exception=True): + return super(BaseModelAccess, self).check_access_rights(operation, raise_exception) + + @api.multi + def check_access_rule(self, operation): + return super(BaseModelAccess, self).check_access_rule(operation) + + @api.model + def _apply_ir_rules(self, query, mode='read'): + return super(BaseModelAccess, self)._apply_ir_rules(query, mode) + + @api.model + def check_field_access_rights(self, operation, fields): + return super(BaseModelAccess, self).check_field_access_rights(operation, fields) + + @api.multi + def check_access(self, operation, raise_exception=False): + try: + access_right = self.check_access_rights(operation, raise_exception) + access_rule = self.check_access_rule(operation) == None + access = access_right and access_rule + if not access and raise_exception: + raise AccessError(_("This operation is forbidden!")) + return access + except AccessError: + if raise_exception: + raise AccessError(_("This operation is forbidden!")) + return False + + #---------------------------------------------------------- + # Search + #---------------------------------------------------------- + + @api.model + def _search_permission_read(self, operator, operand): + records = self.search([]).filtered(lambda r: r.check_access('read') == True) + if operator == '=' and operand: + return [('id', 'in', records.mapped('id'))] + return [('id', 'not in', records.mapped('id'))] + + @api.model + def _search_permission_create(self, operator, operand): + records = self.search([]).filtered(lambda r: r.check_access('create') == True) + if operator == '=' and operand: + return [('id', 'in', records.mapped('id'))] + return [('id', 'not in', records.mapped('id'))] + + @api.model + def _search_permission_write(self, operator, operand): + records = self.search([]).filtered(lambda r: r.check_access('write') == True) + if operator == '=' and operand: + return [('id', 'in', records.mapped('id'))] + return [('id', 'not in', records.mapped('id'))] + + @api.model + def _search_permission_unlink(self, operator, operand): + records = self.search([]).filtered(lambda r: r.check_access('unlink') == True) + if operator == '=' and operand: + return [('id', 'in', records.mapped('id'))] + return [('id', 'not in', records.mapped('id'))] + + #---------------------------------------------------------- + # Read, View + #---------------------------------------------------------- + + @api.multi + def _compute_permissions(self): + for record in self: + record.update({ + 'permission_read': record.check_access('read'), + 'permission_create': record.check_access('create'), + 'permission_write': record.check_access('write'), + 'permission_unlink': record.check_access('unlink'), + }) diff --git a/muk_security/models/ir_rule.py b/muk_security/models/ir_rule.py index 9acc3eb..cc7ff05 100644 --- a/muk_security/models/ir_rule.py +++ b/muk_security/models/ir_rule.py @@ -1,39 +1,39 @@ -################################################################################### -# -# Copyright (C) 2017 MuK IT GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -################################################################################### - -import logging - -from odoo import api, fields, models -from odoo import tools, _ -from odoo.exceptions import ValidationError - -from odoo.addons.muk_security.tools import helper - -_logger = logging.getLogger(__name__) - -class ExtendedIrRule(models.Model): - - _inherit = 'ir.rule' - - @api.model - @tools.ormcache('self._uid', 'model_name', 'mode') - def _compute_domain(self, model_name, mode="read"): - if isinstance(self.env.uid, helper.NoSecurityUid): - return None - return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode) \ No newline at end of file +################################################################################### +# +# Copyright (C) 2017 MuK IT GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +################################################################################### + +import logging + +from odoo import api, fields, models +from odoo import tools, _ +from odoo.exceptions import ValidationError + +from odoo.addons.muk_security.tools import helper + +_logger = logging.getLogger(__name__) + +class ExtendedIrRule(models.Model): + + _inherit = 'ir.rule' + + @api.model + @tools.ormcache('self._uid', 'model_name', 'mode') + def _compute_domain(self, model_name, mode="read"): + if isinstance(self.env.uid, helper.NoSecurityUid): + return None + return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode) diff --git a/muk_security/models/locking.py b/muk_security/models/locking.py index 03cc3a3..b1c9ce0 100644 --- a/muk_security/models/locking.py +++ b/muk_security/models/locking.py @@ -1,251 +1,251 @@ -################################################################################### -# -# Copyright (C) 2017 MuK IT GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -################################################################################### - -import os -import hashlib -import logging -import itertools - -from odoo import _ -from odoo import models, api, fields -from odoo.exceptions import AccessError - -_logger = logging.getLogger(__name__) - -class BaseModelLocking(models.AbstractModel): - - _name = 'muk_security.locking' - _description = 'MuK Locking Model' - _inherit = 'muk_utils.model' - - #---------------------------------------------------------- - # Database - #---------------------------------------------------------- - - locked = fields.Many2one( - comodel_name='muk_security.lock', - compute='_compute_lock', - string="Locked") - - locked_state = fields.Boolean( - compute='_compute_lock', - string="Locked") - - locked_by = fields.Many2one( - related='locked.locked_by_ref', - comodel_name='res.users', - string="Locked by") - - editor = fields.Boolean( - compute='_compute_editor', - string="Editor") - - #---------------------------------------------------------- - # Functions - #---------------------------------------------------------- - - @api.model - def generate_operation_key(self): - return hashlib.sha1(os.urandom(128)).hexdigest() - - #---------------------------------------------------------- - # Locking - #---------------------------------------------------------- - - @api.multi - def lock(self, user=None, operation=None, *largs, **kwargs): - result = [] - for record in self: - lock = record.is_locked() - if lock and lock.operation and lock.operation == operation: - result.append({ - 'record': record, - 'lock': lock, - 'token': lock.token}) - elif lock and ((lock.operation and lock.operation != operation) or not lock.operation): - raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") % - (record._name, record.id)) - else: - token = hashlib.sha1(os.urandom(128)).hexdigest() - lock = self.env['muk_security.lock'].sudo().create({ - 'locked_by': user and user.name or "System", - 'locked_by_ref': user and user.id or None, - 'lock_ref': record._name + ',' + str(record.id), - 'token': token, - 'operation': operation}) - result.append({ - 'record': record, - 'lock': lock, - 'token': token}) - return result - - @api.multi - def unlock(self, *largs, **kwargs): - for record in self: - locks = self.env['muk_security.lock'].sudo().search( - [('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))]) - locks.sudo().unlink() - return True - - @api.model - def unlock_operation(self, operation, *largs, **kwargs): - locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)]) - references = [ - list((k, list(map(lambda rec: rec.lock_ref_id, v)))) - for k, v in itertools.groupby( - locks.sorted(key=lambda rec: rec.lock_ref_model), - lambda rec: rec.lock_ref_model)] - locks.sudo().unlink() - return references - - @api.multi - def is_locked(self, *largs, **kwargs): - self.ensure_one() - lock = self.env['muk_security.lock'].sudo().search( - [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1) - if lock.id: - return lock - return False - - @api.multi - def is_locked_by(self, *largs, **kwargs): - self.ensure_one() - lock = self.env['muk_security.lock'].sudo().search( - [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1) - if lock.id: - return lock.locked_by_ref - return False - - @api.multi - def _checking_lock_user(self, *largs, **kwargs): - for record in self: - lock = record.is_locked() - if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id: - raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") % - (self._name, self.id)) - - @api.multi - def _checking_lock(self, operation=None, *largs, **kwargs): - self._checking_lock_user() - for record in self: - lock = record.is_locked() - if lock and lock.operation and lock.operation != operation: - print(operation, lock.operation) - raise IOError - raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") % - (self._name, self.id)) - - @api.multi - def user_lock(self, *largs, **kwargs): - self.ensure_one() - lock = self.is_locked() - if lock: - if lock.locked_by_ref: - raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name) - else: - raise AccessError(_("The record is already locked.")) - return self.lock(user=self.env.user) - - @api.multi - def user_unlock(self, *largs, **kwargs): - self.ensure_one() - lock = self.is_locked() - if lock: - if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id: - self.unlock() - else: - if lock.locked_by_ref: - raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name) - else: - raise AccessError(_("The record is already locked.")) - return True - - #---------------------------------------------------------- - # Read, View - #---------------------------------------------------------- - - @api.multi - def _compute_lock(self): - for record in self: - locked = record.is_locked() - record.update({ - 'locked_state': bool(locked), - 'locked': locked}) - - @api.depends('locked') - def _compute_editor(self): - for record in self: - record.editor = record.is_locked_by() == record.env.user - - #---------------------------------------------------------- - # Create, Update, Delete - #---------------------------------------------------------- - - @api.multi - def write(self, vals): - operation = self.generate_operation_key() - vals = self._before_write_operation(vals, operation) - process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation - result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals) - for record in self: - record._after_write_record_operation(vals, operation) - result = self._after_write_operation(result, vals, operation) - return result - - @api.multi - def _before_write_operation(self, vals, operation, *largs, **kwargs): - if 'operation' in self.env.context: - self._checking_lock(self.env.context['operation']) - else: - self._checking_lock(operation) - return vals - - @api.multi - def _after_write_record_operation(self, vals, operation, *largs, **kwargs): - return vals - - @api.multi - def _after_write_operation(self, result, vals, operation, *largs, **kwargs): - return result - - @api.multi - def unlink(self): - operation = self.generate_operation_key() - self._before_unlink_operation(operation) - for record in self: - record._before_unlink_record_operation(operation) - process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation - result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink() - self._after_unlink_operation(result, operation) - return result - - @api.multi - def _before_unlink_operation(self, operation, *largs, **kwargs): - if 'operation' in self.env.context: - self._checking_lock(self.env.context['operation']) - else: - self._checking_lock(operation) - - @api.multi - def _before_unlink_record_operation(self, operation, *largs, **kwargs): - pass - - @api.multi - def _after_unlink_operation(self, result, operation, *largs, **kwargs): - pass \ No newline at end of file +################################################################################### +# +# Copyright (C) 2017 MuK IT GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +################################################################################### + +import os +import hashlib +import logging +import itertools + +from odoo import _ +from odoo import models, api, fields +from odoo.exceptions import AccessError + +_logger = logging.getLogger(__name__) + +class BaseModelLocking(models.AbstractModel): + + _name = 'muk_security.locking' + _description = 'MuK Locking Model' + _inherit = 'muk_utils.model' + + #---------------------------------------------------------- + # Database + #---------------------------------------------------------- + + locked = fields.Many2one( + comodel_name='muk_security.lock', + compute='_compute_lock', + string="Locked") + + locked_state = fields.Boolean( + compute='_compute_lock', + string="Locked") + + locked_by = fields.Many2one( + related='locked.locked_by_ref', + comodel_name='res.users', + string="Locked by") + + editor = fields.Boolean( + compute='_compute_editor', + string="Editor") + + #---------------------------------------------------------- + # Functions + #---------------------------------------------------------- + + @api.model + def generate_operation_key(self): + return hashlib.sha1(os.urandom(128)).hexdigest() + + #---------------------------------------------------------- + # Locking + #---------------------------------------------------------- + + @api.multi + def lock(self, user=None, operation=None, *largs, **kwargs): + result = [] + for record in self: + lock = record.is_locked() + if lock and lock.operation and lock.operation == operation: + result.append({ + 'record': record, + 'lock': lock, + 'token': lock.token}) + elif lock and ((lock.operation and lock.operation != operation) or not lock.operation): + raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") % + (record._name, record.id)) + else: + token = hashlib.sha1(os.urandom(128)).hexdigest() + lock = self.env['muk_security.lock'].sudo().create({ + 'locked_by': user and user.name or "System", + 'locked_by_ref': user and user.id or None, + 'lock_ref': record._name + ',' + str(record.id), + 'token': token, + 'operation': operation}) + result.append({ + 'record': record, + 'lock': lock, + 'token': token}) + return result + + @api.multi + def unlock(self, *largs, **kwargs): + for record in self: + locks = self.env['muk_security.lock'].sudo().search( + [('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))]) + locks.sudo().unlink() + return True + + @api.model + def unlock_operation(self, operation, *largs, **kwargs): + locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)]) + references = [ + list((k, list(map(lambda rec: rec.lock_ref_id, v)))) + for k, v in itertools.groupby( + locks.sorted(key=lambda rec: rec.lock_ref_model), + lambda rec: rec.lock_ref_model)] + locks.sudo().unlink() + return references + + @api.multi + def is_locked(self, *largs, **kwargs): + self.ensure_one() + lock = self.env['muk_security.lock'].sudo().search( + [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1) + if lock.id: + return lock + return False + + @api.multi + def is_locked_by(self, *largs, **kwargs): + self.ensure_one() + lock = self.env['muk_security.lock'].sudo().search( + [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1) + if lock.id: + return lock.locked_by_ref + return False + + @api.multi + def _checking_lock_user(self, *largs, **kwargs): + for record in self: + lock = record.is_locked() + if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id: + raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") % + (self._name, self.id)) + + @api.multi + def _checking_lock(self, operation=None, *largs, **kwargs): + self._checking_lock_user() + for record in self: + lock = record.is_locked() + if lock and lock.operation and lock.operation != operation: + print(operation, lock.operation) + raise IOError + raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") % + (self._name, self.id)) + + @api.multi + def user_lock(self, *largs, **kwargs): + self.ensure_one() + lock = self.is_locked() + if lock: + if lock.locked_by_ref: + raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name) + else: + raise AccessError(_("The record is already locked.")) + return self.lock(user=self.env.user) + + @api.multi + def user_unlock(self, *largs, **kwargs): + self.ensure_one() + lock = self.is_locked() + if lock: + if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id: + self.unlock() + else: + if lock.locked_by_ref: + raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name) + else: + raise AccessError(_("The record is already locked.")) + return True + + #---------------------------------------------------------- + # Read, View + #---------------------------------------------------------- + + @api.multi + def _compute_lock(self): + for record in self: + locked = record.is_locked() + record.update({ + 'locked_state': bool(locked), + 'locked': locked}) + + @api.depends('locked') + def _compute_editor(self): + for record in self: + record.editor = record.is_locked_by() == record.env.user + + #---------------------------------------------------------- + # Create, Update, Delete + #---------------------------------------------------------- + + @api.multi + def write(self, vals): + operation = self.generate_operation_key() + vals = self._before_write_operation(vals, operation) + process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation + result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals) + for record in self: + record._after_write_record_operation(vals, operation) + result = self._after_write_operation(result, vals, operation) + return result + + @api.multi + def _before_write_operation(self, vals, operation, *largs, **kwargs): + if 'operation' in self.env.context: + self._checking_lock(self.env.context['operation']) + else: + self._checking_lock(operation) + return vals + + @api.multi + def _after_write_record_operation(self, vals, operation, *largs, **kwargs): + return vals + + @api.multi + def _after_write_operation(self, result, vals, operation, *largs, **kwargs): + return result + + @api.multi + def unlink(self): + operation = self.generate_operation_key() + self._before_unlink_operation(operation) + for record in self: + record._before_unlink_record_operation(operation) + process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation + result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink() + self._after_unlink_operation(result, operation) + return result + + @api.multi + def _before_unlink_operation(self, operation, *largs, **kwargs): + if 'operation' in self.env.context: + self._checking_lock(self.env.context['operation']) + else: + self._checking_lock(operation) + + @api.multi + def _before_unlink_record_operation(self, operation, *largs, **kwargs): + pass + + @api.multi + def _after_unlink_operation(self, result, operation, *largs, **kwargs): + pass diff --git a/muk_security/models/res_groups.py b/muk_security/models/res_groups.py index 64d0b1c..6198646 100644 --- a/muk_security/models/res_groups.py +++ b/muk_security/models/res_groups.py @@ -38,4 +38,4 @@ class AccessGroups(models.Model): relation='muk_security_groups_groups_rel', column1='rid', column2='gid', - string='Groups') \ No newline at end of file + string='Groups') diff --git a/muk_security/models/res_users.py b/muk_security/models/res_users.py index 084d0dd..c551937 100644 --- a/muk_security/models/res_users.py +++ b/muk_security/models/res_users.py @@ -64,4 +64,4 @@ class AccessUser(models.Model): return super(helper.NoSecurityUid, id).__int__() return id access_ids = [convert_security_uid(id) for id in ids] - return super(AccessUser, cls)._browse(access_ids, *args, **kwargs) \ No newline at end of file + return super(AccessUser, cls)._browse(access_ids, *args, **kwargs) diff --git a/muk_security/tests/__init__.py b/muk_security/tests/__init__.py index cb6eb81..86a6b32 100644 --- a/muk_security/tests/__init__.py +++ b/muk_security/tests/__init__.py @@ -19,4 +19,4 @@ # ################################################################################### -from . import test_suspend_security \ No newline at end of file +from . import test_suspend_security diff --git a/muk_security/tests/test_suspend_security.py b/muk_security/tests/test_suspend_security.py index 7a40283..55dae73 100644 --- a/muk_security/tests/test_suspend_security.py +++ b/muk_security/tests/test_suspend_security.py @@ -1,50 +1,81 @@ -################################################################################### -# -# Copyright (C) 2017 MuK IT GmbH -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -################################################################################### - -import os -import base64 -import logging - -from odoo import exceptions -from odoo.tests import common - -_path = os.path.dirname(os.path.dirname(__file__)) -_logger = logging.getLogger(__name__) - -class SuspendSecurityTestCase(common.TransactionCase): - - at_install = False - post_install = True - - def setUp(self): - super(SuspendSecurityTestCase, self).setUp() - - def tearDown(self): - super(SuspendSecurityTestCase, self).tearDown() - - def test_suspend_security(self): - user_id = self.env.ref('base.user_demo').id - with self.assertRaises(exceptions.AccessError): - self.env.ref('base.user_root').sudo(user_id).name = 'test' - self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test' - self.assertEqual(self.env.ref('base.user_root').name, 'test') - self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id) - - def test_normalize(self): - self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid) \ No newline at end of file +################################################################################### +# +# Copyright (C) 2017 MuK IT GmbH +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +################################################################################### +import os +import base64 +import logging +from odoo import exceptions +from odoo.tests import common +_path = os.path.dirname(os.path.dirname(__file__)) +_logger = logging.getLogger(__name__) +class SuspendSecurityTestCase(common.TransactionCase): + at_install = False + post_install = True + def setUp(self): + super(SuspendSecurityTestCase, self).setUp() + def tearDown(self): + super(SuspendSecurityTestCase, self).tearDown() + def test_suspend_security(self): + user_id = self.env.ref('base.user_demo').id + other_company = self.env['res.company'].create({ + 'name': 'other company', + # without this, a partner is created and mail's constraint on + # notify_email kicks in + 'partner_id': self.env.ref('base.partner_demo').id, + }) + with self.assertRaises(exceptions.AccessError): + self.env.ref('base.user_root').sudo(user_id).name = 'test' + with self.assertRaises(exceptions.AccessError): + other_company.sudo(user_id).name = 'test' + self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test' + self.assertEqual(self.env.ref('base.user_root').name, 'test') + self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id) + # this tests ir.rule + other_company.sudo(user_id).suspend_security().write({'name': 'test'}) + self.assertEqual(other_company.name, 'test') + self.assertEqual(other_company.write_uid.id, user_id) + # this tests if _normalize_args conversion works + self.env['res.users'].browse( + self.env['res.users'].suspend_security().env.uid) + # Test normal search on One2many + partner = self.env['res.partner'].search( + [('user_ids.id', '=', self.env.user.suspend_security().env.uid)], + limit=1) + self.assertEqual(partner, self.env.user.partner_id) + # Test search on One2many without specifing ID (standard Odoo) + partner = self.env['res.partner'].search( + [('user_ids', '=', self.env.uid)], + limit=1) + self.assertEqual(partner, self.env.user.partner_id) + # Test search on One2many without specifing ID (suspend_security) + partner = self.env['res.partner'].search( + [('user_ids', '=', self.env.user.suspend_security().env.uid)], + limit=1) + self.assertEqual(partner, self.env.user.partner_id) + # Test search on One2many without ID with IN (standard Odoo) + partner = self.env['res.partner'].search( + [('user_ids', 'in', self.env.uid)], + limit=1) + self.assertEqual(partner, self.env.user.partner_id) + # Test search on One2many without ID with IN (suspend_security) + partner = self.env['res.partner'].search( + [('user_ids', 'in', self.env.user.suspend_security().env.uid)], + limit=1) + self.assertEqual(partner, self.env.user.partner_id) + def test_normalize(self): + self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid)