diff --git a/muk_security/__init__.py b/muk_security/__init__.py
index ba31b8c..31964ee 100644
--- a/muk_security/__init__.py
+++ b/muk_security/__init__.py
@@ -20,4 +20,4 @@
from . import models
def _patch_system():
- from . import base
\ No newline at end of file
+ from . import base
diff --git a/muk_security/__manifest__.py b/muk_security/__manifest__.py
index c2c9345..661e6a2 100644
--- a/muk_security/__manifest__.py
+++ b/muk_security/__manifest__.py
@@ -20,7 +20,7 @@
{
"name": "MuK Security",
"summary": """Security Features""",
- "version": "11.0.1.1.7",
+ "version": "11.0.1.1.8",
"category": "Extra Tools",
"license": "AGPL-3",
"website": "http://www.mukit.at",
@@ -54,4 +54,4 @@
"application": False,
"installable": True,
"post_load": "_patch_system",
-}
\ No newline at end of file
+}
diff --git a/muk_security/models/access.py b/muk_security/models/access.py
index 192324a..561986b 100644
--- a/muk_security/models/access.py
+++ b/muk_security/models/access.py
@@ -1,136 +1,136 @@
-###################################################################################
-#
-# Copyright (C) 2017 MuK IT GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-###################################################################################
-
-import logging
-
-from odoo import _
-from odoo import models, api, fields
-from odoo.exceptions import AccessError
-
-_logger = logging.getLogger(__name__)
-
-class BaseModelAccess(models.AbstractModel):
-
- _name = 'muk_security.access'
- _description = "MuK Access Model"
- _inherit = 'muk_utils.model'
-
- #----------------------------------------------------------
- # Database
- #----------------------------------------------------------
-
- permission_read = fields.Boolean(
- compute='_compute_permissions',
- search='_search_permission_read',
- string="Read Access")
-
- permission_create = fields.Boolean(
- compute='_compute_permissions',
- search='_search_permission_create',
- string="Create Access")
-
- permission_write = fields.Boolean(
- compute='_compute_permissions',
- search='_search_permission_write',
- string="Write Access")
-
- permission_unlink = fields.Boolean(
- compute='_compute_permissions',
- search='_search_permission_unlink',
- string="Delete Access")
-
- #----------------------------------------------------------
- # Function
- #----------------------------------------------------------
-
- @api.model
- def check_access_rights(self, operation, raise_exception=True):
- return super(BaseModelAccess, self).check_access_rights(operation, raise_exception)
-
- @api.multi
- def check_access_rule(self, operation):
- return super(BaseModelAccess, self).check_access_rule(operation)
-
- @api.model
- def _apply_ir_rules(self, query, mode='read'):
- return super(BaseModelAccess, self)._apply_ir_rules(query, mode)
-
- @api.model
- def check_field_access_rights(self, operation, fields):
- return super(BaseModelAccess, self).check_field_access_rights(operation, fields)
-
- @api.multi
- def check_access(self, operation, raise_exception=False):
- try:
- access_right = self.check_access_rights(operation, raise_exception)
- access_rule = self.check_access_rule(operation) == None
- access = access_right and access_rule
- if not access and raise_exception:
- raise AccessError(_("This operation is forbidden!"))
- return access
- except AccessError:
- if raise_exception:
- raise AccessError(_("This operation is forbidden!"))
- return False
-
- #----------------------------------------------------------
- # Search
- #----------------------------------------------------------
-
- @api.model
- def _search_permission_read(self, operator, operand):
- records = self.search([]).filtered(lambda r: r.check_access('read') == True)
- if operator == '=' and operand:
- return [('id', 'in', records.mapped('id'))]
- return [('id', 'not in', records.mapped('id'))]
-
- @api.model
- def _search_permission_create(self, operator, operand):
- records = self.search([]).filtered(lambda r: r.check_access('create') == True)
- if operator == '=' and operand:
- return [('id', 'in', records.mapped('id'))]
- return [('id', 'not in', records.mapped('id'))]
-
- @api.model
- def _search_permission_write(self, operator, operand):
- records = self.search([]).filtered(lambda r: r.check_access('write') == True)
- if operator == '=' and operand:
- return [('id', 'in', records.mapped('id'))]
- return [('id', 'not in', records.mapped('id'))]
-
- @api.model
- def _search_permission_unlink(self, operator, operand):
- records = self.search([]).filtered(lambda r: r.check_access('unlink') == True)
- if operator == '=' and operand:
- return [('id', 'in', records.mapped('id'))]
- return [('id', 'not in', records.mapped('id'))]
-
- #----------------------------------------------------------
- # Read, View
- #----------------------------------------------------------
-
- @api.multi
- def _compute_permissions(self):
- for record in self:
- record.update({
- 'permission_read': record.check_access('read'),
- 'permission_create': record.check_access('create'),
- 'permission_write': record.check_access('write'),
- 'permission_unlink': record.check_access('unlink'),
- })
\ No newline at end of file
+###################################################################################
+#
+# Copyright (C) 2017 MuK IT GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+###################################################################################
+
+import logging
+
+from odoo import _
+from odoo import models, api, fields
+from odoo.exceptions import AccessError
+
+_logger = logging.getLogger(__name__)
+
+class BaseModelAccess(models.AbstractModel):
+
+ _name = 'muk_security.access'
+ _description = "MuK Access Model"
+ _inherit = 'muk_utils.model'
+
+ #----------------------------------------------------------
+ # Database
+ #----------------------------------------------------------
+
+ permission_read = fields.Boolean(
+ compute='_compute_permissions',
+ search='_search_permission_read',
+ string="Read Access")
+
+ permission_create = fields.Boolean(
+ compute='_compute_permissions',
+ search='_search_permission_create',
+ string="Create Access")
+
+ permission_write = fields.Boolean(
+ compute='_compute_permissions',
+ search='_search_permission_write',
+ string="Write Access")
+
+ permission_unlink = fields.Boolean(
+ compute='_compute_permissions',
+ search='_search_permission_unlink',
+ string="Delete Access")
+
+ #----------------------------------------------------------
+ # Function
+ #----------------------------------------------------------
+
+ @api.model
+ def check_access_rights(self, operation, raise_exception=True):
+ return super(BaseModelAccess, self).check_access_rights(operation, raise_exception)
+
+ @api.multi
+ def check_access_rule(self, operation):
+ return super(BaseModelAccess, self).check_access_rule(operation)
+
+ @api.model
+ def _apply_ir_rules(self, query, mode='read'):
+ return super(BaseModelAccess, self)._apply_ir_rules(query, mode)
+
+ @api.model
+ def check_field_access_rights(self, operation, fields):
+ return super(BaseModelAccess, self).check_field_access_rights(operation, fields)
+
+ @api.multi
+ def check_access(self, operation, raise_exception=False):
+ try:
+ access_right = self.check_access_rights(operation, raise_exception)
+ access_rule = self.check_access_rule(operation) == None
+ access = access_right and access_rule
+ if not access and raise_exception:
+ raise AccessError(_("This operation is forbidden!"))
+ return access
+ except AccessError:
+ if raise_exception:
+ raise AccessError(_("This operation is forbidden!"))
+ return False
+
+ #----------------------------------------------------------
+ # Search
+ #----------------------------------------------------------
+
+ @api.model
+ def _search_permission_read(self, operator, operand):
+ records = self.search([]).filtered(lambda r: r.check_access('read') == True)
+ if operator == '=' and operand:
+ return [('id', 'in', records.mapped('id'))]
+ return [('id', 'not in', records.mapped('id'))]
+
+ @api.model
+ def _search_permission_create(self, operator, operand):
+ records = self.search([]).filtered(lambda r: r.check_access('create') == True)
+ if operator == '=' and operand:
+ return [('id', 'in', records.mapped('id'))]
+ return [('id', 'not in', records.mapped('id'))]
+
+ @api.model
+ def _search_permission_write(self, operator, operand):
+ records = self.search([]).filtered(lambda r: r.check_access('write') == True)
+ if operator == '=' and operand:
+ return [('id', 'in', records.mapped('id'))]
+ return [('id', 'not in', records.mapped('id'))]
+
+ @api.model
+ def _search_permission_unlink(self, operator, operand):
+ records = self.search([]).filtered(lambda r: r.check_access('unlink') == True)
+ if operator == '=' and operand:
+ return [('id', 'in', records.mapped('id'))]
+ return [('id', 'not in', records.mapped('id'))]
+
+ #----------------------------------------------------------
+ # Read, View
+ #----------------------------------------------------------
+
+ @api.multi
+ def _compute_permissions(self):
+ for record in self:
+ record.update({
+ 'permission_read': record.check_access('read'),
+ 'permission_create': record.check_access('create'),
+ 'permission_write': record.check_access('write'),
+ 'permission_unlink': record.check_access('unlink'),
+ })
diff --git a/muk_security/models/ir_rule.py b/muk_security/models/ir_rule.py
index 9acc3eb..cc7ff05 100644
--- a/muk_security/models/ir_rule.py
+++ b/muk_security/models/ir_rule.py
@@ -1,39 +1,39 @@
-###################################################################################
-#
-# Copyright (C) 2017 MuK IT GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-###################################################################################
-
-import logging
-
-from odoo import api, fields, models
-from odoo import tools, _
-from odoo.exceptions import ValidationError
-
-from odoo.addons.muk_security.tools import helper
-
-_logger = logging.getLogger(__name__)
-
-class ExtendedIrRule(models.Model):
-
- _inherit = 'ir.rule'
-
- @api.model
- @tools.ormcache('self._uid', 'model_name', 'mode')
- def _compute_domain(self, model_name, mode="read"):
- if isinstance(self.env.uid, helper.NoSecurityUid):
- return None
- return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode)
\ No newline at end of file
+###################################################################################
+#
+# Copyright (C) 2017 MuK IT GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+###################################################################################
+
+import logging
+
+from odoo import api, fields, models
+from odoo import tools, _
+from odoo.exceptions import ValidationError
+
+from odoo.addons.muk_security.tools import helper
+
+_logger = logging.getLogger(__name__)
+
+class ExtendedIrRule(models.Model):
+
+ _inherit = 'ir.rule'
+
+ @api.model
+ @tools.ormcache('self._uid', 'model_name', 'mode')
+ def _compute_domain(self, model_name, mode="read"):
+ if isinstance(self.env.uid, helper.NoSecurityUid):
+ return None
+ return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode)
diff --git a/muk_security/models/locking.py b/muk_security/models/locking.py
index 03cc3a3..b1c9ce0 100644
--- a/muk_security/models/locking.py
+++ b/muk_security/models/locking.py
@@ -1,251 +1,251 @@
-###################################################################################
-#
-# Copyright (C) 2017 MuK IT GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-###################################################################################
-
-import os
-import hashlib
-import logging
-import itertools
-
-from odoo import _
-from odoo import models, api, fields
-from odoo.exceptions import AccessError
-
-_logger = logging.getLogger(__name__)
-
-class BaseModelLocking(models.AbstractModel):
-
- _name = 'muk_security.locking'
- _description = 'MuK Locking Model'
- _inherit = 'muk_utils.model'
-
- #----------------------------------------------------------
- # Database
- #----------------------------------------------------------
-
- locked = fields.Many2one(
- comodel_name='muk_security.lock',
- compute='_compute_lock',
- string="Locked")
-
- locked_state = fields.Boolean(
- compute='_compute_lock',
- string="Locked")
-
- locked_by = fields.Many2one(
- related='locked.locked_by_ref',
- comodel_name='res.users',
- string="Locked by")
-
- editor = fields.Boolean(
- compute='_compute_editor',
- string="Editor")
-
- #----------------------------------------------------------
- # Functions
- #----------------------------------------------------------
-
- @api.model
- def generate_operation_key(self):
- return hashlib.sha1(os.urandom(128)).hexdigest()
-
- #----------------------------------------------------------
- # Locking
- #----------------------------------------------------------
-
- @api.multi
- def lock(self, user=None, operation=None, *largs, **kwargs):
- result = []
- for record in self:
- lock = record.is_locked()
- if lock and lock.operation and lock.operation == operation:
- result.append({
- 'record': record,
- 'lock': lock,
- 'token': lock.token})
- elif lock and ((lock.operation and lock.operation != operation) or not lock.operation):
- raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") %
- (record._name, record.id))
- else:
- token = hashlib.sha1(os.urandom(128)).hexdigest()
- lock = self.env['muk_security.lock'].sudo().create({
- 'locked_by': user and user.name or "System",
- 'locked_by_ref': user and user.id or None,
- 'lock_ref': record._name + ',' + str(record.id),
- 'token': token,
- 'operation': operation})
- result.append({
- 'record': record,
- 'lock': lock,
- 'token': token})
- return result
-
- @api.multi
- def unlock(self, *largs, **kwargs):
- for record in self:
- locks = self.env['muk_security.lock'].sudo().search(
- [('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))])
- locks.sudo().unlink()
- return True
-
- @api.model
- def unlock_operation(self, operation, *largs, **kwargs):
- locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)])
- references = [
- list((k, list(map(lambda rec: rec.lock_ref_id, v))))
- for k, v in itertools.groupby(
- locks.sorted(key=lambda rec: rec.lock_ref_model),
- lambda rec: rec.lock_ref_model)]
- locks.sudo().unlink()
- return references
-
- @api.multi
- def is_locked(self, *largs, **kwargs):
- self.ensure_one()
- lock = self.env['muk_security.lock'].sudo().search(
- [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
- if lock.id:
- return lock
- return False
-
- @api.multi
- def is_locked_by(self, *largs, **kwargs):
- self.ensure_one()
- lock = self.env['muk_security.lock'].sudo().search(
- [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
- if lock.id:
- return lock.locked_by_ref
- return False
-
- @api.multi
- def _checking_lock_user(self, *largs, **kwargs):
- for record in self:
- lock = record.is_locked()
- if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id:
- raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") %
- (self._name, self.id))
-
- @api.multi
- def _checking_lock(self, operation=None, *largs, **kwargs):
- self._checking_lock_user()
- for record in self:
- lock = record.is_locked()
- if lock and lock.operation and lock.operation != operation:
- print(operation, lock.operation)
- raise IOError
- raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") %
- (self._name, self.id))
-
- @api.multi
- def user_lock(self, *largs, **kwargs):
- self.ensure_one()
- lock = self.is_locked()
- if lock:
- if lock.locked_by_ref:
- raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
- else:
- raise AccessError(_("The record is already locked."))
- return self.lock(user=self.env.user)
-
- @api.multi
- def user_unlock(self, *largs, **kwargs):
- self.ensure_one()
- lock = self.is_locked()
- if lock:
- if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id:
- self.unlock()
- else:
- if lock.locked_by_ref:
- raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
- else:
- raise AccessError(_("The record is already locked."))
- return True
-
- #----------------------------------------------------------
- # Read, View
- #----------------------------------------------------------
-
- @api.multi
- def _compute_lock(self):
- for record in self:
- locked = record.is_locked()
- record.update({
- 'locked_state': bool(locked),
- 'locked': locked})
-
- @api.depends('locked')
- def _compute_editor(self):
- for record in self:
- record.editor = record.is_locked_by() == record.env.user
-
- #----------------------------------------------------------
- # Create, Update, Delete
- #----------------------------------------------------------
-
- @api.multi
- def write(self, vals):
- operation = self.generate_operation_key()
- vals = self._before_write_operation(vals, operation)
- process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
- result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals)
- for record in self:
- record._after_write_record_operation(vals, operation)
- result = self._after_write_operation(result, vals, operation)
- return result
-
- @api.multi
- def _before_write_operation(self, vals, operation, *largs, **kwargs):
- if 'operation' in self.env.context:
- self._checking_lock(self.env.context['operation'])
- else:
- self._checking_lock(operation)
- return vals
-
- @api.multi
- def _after_write_record_operation(self, vals, operation, *largs, **kwargs):
- return vals
-
- @api.multi
- def _after_write_operation(self, result, vals, operation, *largs, **kwargs):
- return result
-
- @api.multi
- def unlink(self):
- operation = self.generate_operation_key()
- self._before_unlink_operation(operation)
- for record in self:
- record._before_unlink_record_operation(operation)
- process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
- result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink()
- self._after_unlink_operation(result, operation)
- return result
-
- @api.multi
- def _before_unlink_operation(self, operation, *largs, **kwargs):
- if 'operation' in self.env.context:
- self._checking_lock(self.env.context['operation'])
- else:
- self._checking_lock(operation)
-
- @api.multi
- def _before_unlink_record_operation(self, operation, *largs, **kwargs):
- pass
-
- @api.multi
- def _after_unlink_operation(self, result, operation, *largs, **kwargs):
- pass
\ No newline at end of file
+###################################################################################
+#
+# Copyright (C) 2017 MuK IT GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+###################################################################################
+
+import os
+import hashlib
+import logging
+import itertools
+
+from odoo import _
+from odoo import models, api, fields
+from odoo.exceptions import AccessError
+
+_logger = logging.getLogger(__name__)
+
+class BaseModelLocking(models.AbstractModel):
+
+ _name = 'muk_security.locking'
+ _description = 'MuK Locking Model'
+ _inherit = 'muk_utils.model'
+
+ #----------------------------------------------------------
+ # Database
+ #----------------------------------------------------------
+
+ locked = fields.Many2one(
+ comodel_name='muk_security.lock',
+ compute='_compute_lock',
+ string="Locked")
+
+ locked_state = fields.Boolean(
+ compute='_compute_lock',
+ string="Locked")
+
+ locked_by = fields.Many2one(
+ related='locked.locked_by_ref',
+ comodel_name='res.users',
+ string="Locked by")
+
+ editor = fields.Boolean(
+ compute='_compute_editor',
+ string="Editor")
+
+ #----------------------------------------------------------
+ # Functions
+ #----------------------------------------------------------
+
+ @api.model
+ def generate_operation_key(self):
+ return hashlib.sha1(os.urandom(128)).hexdigest()
+
+ #----------------------------------------------------------
+ # Locking
+ #----------------------------------------------------------
+
+ @api.multi
+ def lock(self, user=None, operation=None, *largs, **kwargs):
+ result = []
+ for record in self:
+ lock = record.is_locked()
+ if lock and lock.operation and lock.operation == operation:
+ result.append({
+ 'record': record,
+ 'lock': lock,
+ 'token': lock.token})
+ elif lock and ((lock.operation and lock.operation != operation) or not lock.operation):
+ raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") %
+ (record._name, record.id))
+ else:
+ token = hashlib.sha1(os.urandom(128)).hexdigest()
+ lock = self.env['muk_security.lock'].sudo().create({
+ 'locked_by': user and user.name or "System",
+ 'locked_by_ref': user and user.id or None,
+ 'lock_ref': record._name + ',' + str(record.id),
+ 'token': token,
+ 'operation': operation})
+ result.append({
+ 'record': record,
+ 'lock': lock,
+ 'token': token})
+ return result
+
+ @api.multi
+ def unlock(self, *largs, **kwargs):
+ for record in self:
+ locks = self.env['muk_security.lock'].sudo().search(
+ [('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))])
+ locks.sudo().unlink()
+ return True
+
+ @api.model
+ def unlock_operation(self, operation, *largs, **kwargs):
+ locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)])
+ references = [
+ list((k, list(map(lambda rec: rec.lock_ref_id, v))))
+ for k, v in itertools.groupby(
+ locks.sorted(key=lambda rec: rec.lock_ref_model),
+ lambda rec: rec.lock_ref_model)]
+ locks.sudo().unlink()
+ return references
+
+ @api.multi
+ def is_locked(self, *largs, **kwargs):
+ self.ensure_one()
+ lock = self.env['muk_security.lock'].sudo().search(
+ [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
+ if lock.id:
+ return lock
+ return False
+
+ @api.multi
+ def is_locked_by(self, *largs, **kwargs):
+ self.ensure_one()
+ lock = self.env['muk_security.lock'].sudo().search(
+ [('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
+ if lock.id:
+ return lock.locked_by_ref
+ return False
+
+ @api.multi
+ def _checking_lock_user(self, *largs, **kwargs):
+ for record in self:
+ lock = record.is_locked()
+ if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id:
+ raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") %
+ (self._name, self.id))
+
+ @api.multi
+ def _checking_lock(self, operation=None, *largs, **kwargs):
+ self._checking_lock_user()
+ for record in self:
+ lock = record.is_locked()
+ if lock and lock.operation and lock.operation != operation:
+ print(operation, lock.operation)
+ raise IOError
+ raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") %
+ (self._name, self.id))
+
+ @api.multi
+ def user_lock(self, *largs, **kwargs):
+ self.ensure_one()
+ lock = self.is_locked()
+ if lock:
+ if lock.locked_by_ref:
+ raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
+ else:
+ raise AccessError(_("The record is already locked."))
+ return self.lock(user=self.env.user)
+
+ @api.multi
+ def user_unlock(self, *largs, **kwargs):
+ self.ensure_one()
+ lock = self.is_locked()
+ if lock:
+ if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id:
+ self.unlock()
+ else:
+ if lock.locked_by_ref:
+ raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
+ else:
+ raise AccessError(_("The record is already locked."))
+ return True
+
+ #----------------------------------------------------------
+ # Read, View
+ #----------------------------------------------------------
+
+ @api.multi
+ def _compute_lock(self):
+ for record in self:
+ locked = record.is_locked()
+ record.update({
+ 'locked_state': bool(locked),
+ 'locked': locked})
+
+ @api.depends('locked')
+ def _compute_editor(self):
+ for record in self:
+ record.editor = record.is_locked_by() == record.env.user
+
+ #----------------------------------------------------------
+ # Create, Update, Delete
+ #----------------------------------------------------------
+
+ @api.multi
+ def write(self, vals):
+ operation = self.generate_operation_key()
+ vals = self._before_write_operation(vals, operation)
+ process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
+ result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals)
+ for record in self:
+ record._after_write_record_operation(vals, operation)
+ result = self._after_write_operation(result, vals, operation)
+ return result
+
+ @api.multi
+ def _before_write_operation(self, vals, operation, *largs, **kwargs):
+ if 'operation' in self.env.context:
+ self._checking_lock(self.env.context['operation'])
+ else:
+ self._checking_lock(operation)
+ return vals
+
+ @api.multi
+ def _after_write_record_operation(self, vals, operation, *largs, **kwargs):
+ return vals
+
+ @api.multi
+ def _after_write_operation(self, result, vals, operation, *largs, **kwargs):
+ return result
+
+ @api.multi
+ def unlink(self):
+ operation = self.generate_operation_key()
+ self._before_unlink_operation(operation)
+ for record in self:
+ record._before_unlink_record_operation(operation)
+ process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
+ result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink()
+ self._after_unlink_operation(result, operation)
+ return result
+
+ @api.multi
+ def _before_unlink_operation(self, operation, *largs, **kwargs):
+ if 'operation' in self.env.context:
+ self._checking_lock(self.env.context['operation'])
+ else:
+ self._checking_lock(operation)
+
+ @api.multi
+ def _before_unlink_record_operation(self, operation, *largs, **kwargs):
+ pass
+
+ @api.multi
+ def _after_unlink_operation(self, result, operation, *largs, **kwargs):
+ pass
diff --git a/muk_security/models/res_groups.py b/muk_security/models/res_groups.py
index 64d0b1c..6198646 100644
--- a/muk_security/models/res_groups.py
+++ b/muk_security/models/res_groups.py
@@ -38,4 +38,4 @@ class AccessGroups(models.Model):
relation='muk_security_groups_groups_rel',
column1='rid',
column2='gid',
- string='Groups')
\ No newline at end of file
+ string='Groups')
diff --git a/muk_security/models/res_users.py b/muk_security/models/res_users.py
index 084d0dd..c551937 100644
--- a/muk_security/models/res_users.py
+++ b/muk_security/models/res_users.py
@@ -64,4 +64,4 @@ class AccessUser(models.Model):
return super(helper.NoSecurityUid, id).__int__()
return id
access_ids = [convert_security_uid(id) for id in ids]
- return super(AccessUser, cls)._browse(access_ids, *args, **kwargs)
\ No newline at end of file
+ return super(AccessUser, cls)._browse(access_ids, *args, **kwargs)
diff --git a/muk_security/tests/__init__.py b/muk_security/tests/__init__.py
index cb6eb81..86a6b32 100644
--- a/muk_security/tests/__init__.py
+++ b/muk_security/tests/__init__.py
@@ -19,4 +19,4 @@
#
###################################################################################
-from . import test_suspend_security
\ No newline at end of file
+from . import test_suspend_security
diff --git a/muk_security/tests/test_suspend_security.py b/muk_security/tests/test_suspend_security.py
index 7a40283..55dae73 100644
--- a/muk_security/tests/test_suspend_security.py
+++ b/muk_security/tests/test_suspend_security.py
@@ -1,50 +1,81 @@
-###################################################################################
-#
-# Copyright (C) 2017 MuK IT GmbH
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-#
-###################################################################################
-
-import os
-import base64
-import logging
-
-from odoo import exceptions
-from odoo.tests import common
-
-_path = os.path.dirname(os.path.dirname(__file__))
-_logger = logging.getLogger(__name__)
-
-class SuspendSecurityTestCase(common.TransactionCase):
-
- at_install = False
- post_install = True
-
- def setUp(self):
- super(SuspendSecurityTestCase, self).setUp()
-
- def tearDown(self):
- super(SuspendSecurityTestCase, self).tearDown()
-
- def test_suspend_security(self):
- user_id = self.env.ref('base.user_demo').id
- with self.assertRaises(exceptions.AccessError):
- self.env.ref('base.user_root').sudo(user_id).name = 'test'
- self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test'
- self.assertEqual(self.env.ref('base.user_root').name, 'test')
- self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id)
-
- def test_normalize(self):
- self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid)
\ No newline at end of file
+###################################################################################
+#
+# Copyright (C) 2017 MuK IT GmbH
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+#
+###################################################################################
+import os
+import base64
+import logging
+from odoo import exceptions
+from odoo.tests import common
+_path = os.path.dirname(os.path.dirname(__file__))
+_logger = logging.getLogger(__name__)
+class SuspendSecurityTestCase(common.TransactionCase):
+ at_install = False
+ post_install = True
+ def setUp(self):
+ super(SuspendSecurityTestCase, self).setUp()
+ def tearDown(self):
+ super(SuspendSecurityTestCase, self).tearDown()
+ def test_suspend_security(self):
+ user_id = self.env.ref('base.user_demo').id
+ other_company = self.env['res.company'].create({
+ 'name': 'other company',
+ # without this, a partner is created and mail's constraint on
+ # notify_email kicks in
+ 'partner_id': self.env.ref('base.partner_demo').id,
+ })
+ with self.assertRaises(exceptions.AccessError):
+ self.env.ref('base.user_root').sudo(user_id).name = 'test'
+ with self.assertRaises(exceptions.AccessError):
+ other_company.sudo(user_id).name = 'test'
+ self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test'
+ self.assertEqual(self.env.ref('base.user_root').name, 'test')
+ self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id)
+ # this tests ir.rule
+ other_company.sudo(user_id).suspend_security().write({'name': 'test'})
+ self.assertEqual(other_company.name, 'test')
+ self.assertEqual(other_company.write_uid.id, user_id)
+ # this tests if _normalize_args conversion works
+ self.env['res.users'].browse(
+ self.env['res.users'].suspend_security().env.uid)
+ # Test normal search on One2many
+ partner = self.env['res.partner'].search(
+ [('user_ids.id', '=', self.env.user.suspend_security().env.uid)],
+ limit=1)
+ self.assertEqual(partner, self.env.user.partner_id)
+ # Test search on One2many without specifing ID (standard Odoo)
+ partner = self.env['res.partner'].search(
+ [('user_ids', '=', self.env.uid)],
+ limit=1)
+ self.assertEqual(partner, self.env.user.partner_id)
+ # Test search on One2many without specifing ID (suspend_security)
+ partner = self.env['res.partner'].search(
+ [('user_ids', '=', self.env.user.suspend_security().env.uid)],
+ limit=1)
+ self.assertEqual(partner, self.env.user.partner_id)
+ # Test search on One2many without ID with IN (standard Odoo)
+ partner = self.env['res.partner'].search(
+ [('user_ids', 'in', self.env.uid)],
+ limit=1)
+ self.assertEqual(partner, self.env.user.partner_id)
+ # Test search on One2many without ID with IN (suspend_security)
+ partner = self.env['res.partner'].search(
+ [('user_ids', 'in', self.env.user.suspend_security().env.uid)],
+ limit=1)
+ self.assertEqual(partner, self.env.user.partner_id)
+ def test_normalize(self):
+ self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid)