Browse Source

publish muk_security - 11.0

pull/19/head
MuK IT GmbH 6 years ago
parent
commit
63fd11e118
  1. 2
      muk_security/__init__.py
  2. 4
      muk_security/__manifest__.py
  3. 272
      muk_security/models/access.py
  4. 78
      muk_security/models/ir_rule.py
  5. 502
      muk_security/models/locking.py
  6. 2
      muk_security/models/res_groups.py
  7. 2
      muk_security/models/res_users.py
  8. 2
      muk_security/tests/__init__.py
  9. 131
      muk_security/tests/test_suspend_security.py

2
muk_security/__init__.py

@ -20,4 +20,4 @@
from . import models from . import models
def _patch_system(): def _patch_system():
from . import base
from . import base

4
muk_security/__manifest__.py

@ -20,7 +20,7 @@
{ {
"name": "MuK Security", "name": "MuK Security",
"summary": """Security Features""", "summary": """Security Features""",
"version": "11.0.1.1.7",
"version": "11.0.1.1.8",
"category": "Extra Tools", "category": "Extra Tools",
"license": "AGPL-3", "license": "AGPL-3",
"website": "http://www.mukit.at", "website": "http://www.mukit.at",
@ -54,4 +54,4 @@
"application": False, "application": False,
"installable": True, "installable": True,
"post_load": "_patch_system", "post_load": "_patch_system",
}
}

272
muk_security/models/access.py

@ -1,136 +1,136 @@
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import logging
from odoo import _
from odoo import models, api, fields
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
class BaseModelAccess(models.AbstractModel):
_name = 'muk_security.access'
_description = "MuK Access Model"
_inherit = 'muk_utils.model'
#----------------------------------------------------------
# Database
#----------------------------------------------------------
permission_read = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_read',
string="Read Access")
permission_create = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_create',
string="Create Access")
permission_write = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_write',
string="Write Access")
permission_unlink = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_unlink',
string="Delete Access")
#----------------------------------------------------------
# Function
#----------------------------------------------------------
@api.model
def check_access_rights(self, operation, raise_exception=True):
return super(BaseModelAccess, self).check_access_rights(operation, raise_exception)
@api.multi
def check_access_rule(self, operation):
return super(BaseModelAccess, self).check_access_rule(operation)
@api.model
def _apply_ir_rules(self, query, mode='read'):
return super(BaseModelAccess, self)._apply_ir_rules(query, mode)
@api.model
def check_field_access_rights(self, operation, fields):
return super(BaseModelAccess, self).check_field_access_rights(operation, fields)
@api.multi
def check_access(self, operation, raise_exception=False):
try:
access_right = self.check_access_rights(operation, raise_exception)
access_rule = self.check_access_rule(operation) == None
access = access_right and access_rule
if not access and raise_exception:
raise AccessError(_("This operation is forbidden!"))
return access
except AccessError:
if raise_exception:
raise AccessError(_("This operation is forbidden!"))
return False
#----------------------------------------------------------
# Search
#----------------------------------------------------------
@api.model
def _search_permission_read(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('read') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_create(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('create') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_write(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('write') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_unlink(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('unlink') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
#----------------------------------------------------------
# Read, View
#----------------------------------------------------------
@api.multi
def _compute_permissions(self):
for record in self:
record.update({
'permission_read': record.check_access('read'),
'permission_create': record.check_access('create'),
'permission_write': record.check_access('write'),
'permission_unlink': record.check_access('unlink'),
})
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import logging
from odoo import _
from odoo import models, api, fields
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
class BaseModelAccess(models.AbstractModel):
_name = 'muk_security.access'
_description = "MuK Access Model"
_inherit = 'muk_utils.model'
#----------------------------------------------------------
# Database
#----------------------------------------------------------
permission_read = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_read',
string="Read Access")
permission_create = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_create',
string="Create Access")
permission_write = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_write',
string="Write Access")
permission_unlink = fields.Boolean(
compute='_compute_permissions',
search='_search_permission_unlink',
string="Delete Access")
#----------------------------------------------------------
# Function
#----------------------------------------------------------
@api.model
def check_access_rights(self, operation, raise_exception=True):
return super(BaseModelAccess, self).check_access_rights(operation, raise_exception)
@api.multi
def check_access_rule(self, operation):
return super(BaseModelAccess, self).check_access_rule(operation)
@api.model
def _apply_ir_rules(self, query, mode='read'):
return super(BaseModelAccess, self)._apply_ir_rules(query, mode)
@api.model
def check_field_access_rights(self, operation, fields):
return super(BaseModelAccess, self).check_field_access_rights(operation, fields)
@api.multi
def check_access(self, operation, raise_exception=False):
try:
access_right = self.check_access_rights(operation, raise_exception)
access_rule = self.check_access_rule(operation) == None
access = access_right and access_rule
if not access and raise_exception:
raise AccessError(_("This operation is forbidden!"))
return access
except AccessError:
if raise_exception:
raise AccessError(_("This operation is forbidden!"))
return False
#----------------------------------------------------------
# Search
#----------------------------------------------------------
@api.model
def _search_permission_read(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('read') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_create(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('create') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_write(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('write') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
@api.model
def _search_permission_unlink(self, operator, operand):
records = self.search([]).filtered(lambda r: r.check_access('unlink') == True)
if operator == '=' and operand:
return [('id', 'in', records.mapped('id'))]
return [('id', 'not in', records.mapped('id'))]
#----------------------------------------------------------
# Read, View
#----------------------------------------------------------
@api.multi
def _compute_permissions(self):
for record in self:
record.update({
'permission_read': record.check_access('read'),
'permission_create': record.check_access('create'),
'permission_write': record.check_access('write'),
'permission_unlink': record.check_access('unlink'),
})

78
muk_security/models/ir_rule.py

@ -1,39 +1,39 @@
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import logging
from odoo import api, fields, models
from odoo import tools, _
from odoo.exceptions import ValidationError
from odoo.addons.muk_security.tools import helper
_logger = logging.getLogger(__name__)
class ExtendedIrRule(models.Model):
_inherit = 'ir.rule'
@api.model
@tools.ormcache('self._uid', 'model_name', 'mode')
def _compute_domain(self, model_name, mode="read"):
if isinstance(self.env.uid, helper.NoSecurityUid):
return None
return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode)
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import logging
from odoo import api, fields, models
from odoo import tools, _
from odoo.exceptions import ValidationError
from odoo.addons.muk_security.tools import helper
_logger = logging.getLogger(__name__)
class ExtendedIrRule(models.Model):
_inherit = 'ir.rule'
@api.model
@tools.ormcache('self._uid', 'model_name', 'mode')
def _compute_domain(self, model_name, mode="read"):
if isinstance(self.env.uid, helper.NoSecurityUid):
return None
return super(ExtendedIrRule, self)._compute_domain(model_name, mode=mode)

502
muk_security/models/locking.py

@ -1,251 +1,251 @@
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import os
import hashlib
import logging
import itertools
from odoo import _
from odoo import models, api, fields
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
class BaseModelLocking(models.AbstractModel):
_name = 'muk_security.locking'
_description = 'MuK Locking Model'
_inherit = 'muk_utils.model'
#----------------------------------------------------------
# Database
#----------------------------------------------------------
locked = fields.Many2one(
comodel_name='muk_security.lock',
compute='_compute_lock',
string="Locked")
locked_state = fields.Boolean(
compute='_compute_lock',
string="Locked")
locked_by = fields.Many2one(
related='locked.locked_by_ref',
comodel_name='res.users',
string="Locked by")
editor = fields.Boolean(
compute='_compute_editor',
string="Editor")
#----------------------------------------------------------
# Functions
#----------------------------------------------------------
@api.model
def generate_operation_key(self):
return hashlib.sha1(os.urandom(128)).hexdigest()
#----------------------------------------------------------
# Locking
#----------------------------------------------------------
@api.multi
def lock(self, user=None, operation=None, *largs, **kwargs):
result = []
for record in self:
lock = record.is_locked()
if lock and lock.operation and lock.operation == operation:
result.append({
'record': record,
'lock': lock,
'token': lock.token})
elif lock and ((lock.operation and lock.operation != operation) or not lock.operation):
raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") %
(record._name, record.id))
else:
token = hashlib.sha1(os.urandom(128)).hexdigest()
lock = self.env['muk_security.lock'].sudo().create({
'locked_by': user and user.name or "System",
'locked_by_ref': user and user.id or None,
'lock_ref': record._name + ',' + str(record.id),
'token': token,
'operation': operation})
result.append({
'record': record,
'lock': lock,
'token': token})
return result
@api.multi
def unlock(self, *largs, **kwargs):
for record in self:
locks = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))])
locks.sudo().unlink()
return True
@api.model
def unlock_operation(self, operation, *largs, **kwargs):
locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)])
references = [
list((k, list(map(lambda rec: rec.lock_ref_id, v))))
for k, v in itertools.groupby(
locks.sorted(key=lambda rec: rec.lock_ref_model),
lambda rec: rec.lock_ref_model)]
locks.sudo().unlink()
return references
@api.multi
def is_locked(self, *largs, **kwargs):
self.ensure_one()
lock = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
if lock.id:
return lock
return False
@api.multi
def is_locked_by(self, *largs, **kwargs):
self.ensure_one()
lock = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
if lock.id:
return lock.locked_by_ref
return False
@api.multi
def _checking_lock_user(self, *largs, **kwargs):
for record in self:
lock = record.is_locked()
if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id:
raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") %
(self._name, self.id))
@api.multi
def _checking_lock(self, operation=None, *largs, **kwargs):
self._checking_lock_user()
for record in self:
lock = record.is_locked()
if lock and lock.operation and lock.operation != operation:
print(operation, lock.operation)
raise IOError
raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") %
(self._name, self.id))
@api.multi
def user_lock(self, *largs, **kwargs):
self.ensure_one()
lock = self.is_locked()
if lock:
if lock.locked_by_ref:
raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
else:
raise AccessError(_("The record is already locked."))
return self.lock(user=self.env.user)
@api.multi
def user_unlock(self, *largs, **kwargs):
self.ensure_one()
lock = self.is_locked()
if lock:
if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id:
self.unlock()
else:
if lock.locked_by_ref:
raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
else:
raise AccessError(_("The record is already locked."))
return True
#----------------------------------------------------------
# Read, View
#----------------------------------------------------------
@api.multi
def _compute_lock(self):
for record in self:
locked = record.is_locked()
record.update({
'locked_state': bool(locked),
'locked': locked})
@api.depends('locked')
def _compute_editor(self):
for record in self:
record.editor = record.is_locked_by() == record.env.user
#----------------------------------------------------------
# Create, Update, Delete
#----------------------------------------------------------
@api.multi
def write(self, vals):
operation = self.generate_operation_key()
vals = self._before_write_operation(vals, operation)
process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals)
for record in self:
record._after_write_record_operation(vals, operation)
result = self._after_write_operation(result, vals, operation)
return result
@api.multi
def _before_write_operation(self, vals, operation, *largs, **kwargs):
if 'operation' in self.env.context:
self._checking_lock(self.env.context['operation'])
else:
self._checking_lock(operation)
return vals
@api.multi
def _after_write_record_operation(self, vals, operation, *largs, **kwargs):
return vals
@api.multi
def _after_write_operation(self, result, vals, operation, *largs, **kwargs):
return result
@api.multi
def unlink(self):
operation = self.generate_operation_key()
self._before_unlink_operation(operation)
for record in self:
record._before_unlink_record_operation(operation)
process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink()
self._after_unlink_operation(result, operation)
return result
@api.multi
def _before_unlink_operation(self, operation, *largs, **kwargs):
if 'operation' in self.env.context:
self._checking_lock(self.env.context['operation'])
else:
self._checking_lock(operation)
@api.multi
def _before_unlink_record_operation(self, operation, *largs, **kwargs):
pass
@api.multi
def _after_unlink_operation(self, result, operation, *largs, **kwargs):
pass
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import os
import hashlib
import logging
import itertools
from odoo import _
from odoo import models, api, fields
from odoo.exceptions import AccessError
_logger = logging.getLogger(__name__)
class BaseModelLocking(models.AbstractModel):
_name = 'muk_security.locking'
_description = 'MuK Locking Model'
_inherit = 'muk_utils.model'
#----------------------------------------------------------
# Database
#----------------------------------------------------------
locked = fields.Many2one(
comodel_name='muk_security.lock',
compute='_compute_lock',
string="Locked")
locked_state = fields.Boolean(
compute='_compute_lock',
string="Locked")
locked_by = fields.Many2one(
related='locked.locked_by_ref',
comodel_name='res.users',
string="Locked by")
editor = fields.Boolean(
compute='_compute_editor',
string="Editor")
#----------------------------------------------------------
# Functions
#----------------------------------------------------------
@api.model
def generate_operation_key(self):
return hashlib.sha1(os.urandom(128)).hexdigest()
#----------------------------------------------------------
# Locking
#----------------------------------------------------------
@api.multi
def lock(self, user=None, operation=None, *largs, **kwargs):
result = []
for record in self:
lock = record.is_locked()
if lock and lock.operation and lock.operation == operation:
result.append({
'record': record,
'lock': lock,
'token': lock.token})
elif lock and ((lock.operation and lock.operation != operation) or not lock.operation):
raise AccessError(_("The record (%s[%s]) is locked, so it can't be locked again.") %
(record._name, record.id))
else:
token = hashlib.sha1(os.urandom(128)).hexdigest()
lock = self.env['muk_security.lock'].sudo().create({
'locked_by': user and user.name or "System",
'locked_by_ref': user and user.id or None,
'lock_ref': record._name + ',' + str(record.id),
'token': token,
'operation': operation})
result.append({
'record': record,
'lock': lock,
'token': token})
return result
@api.multi
def unlock(self, *largs, **kwargs):
for record in self:
locks = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', "%s,%s" % (record._name, str(record.id)))])
locks.sudo().unlink()
return True
@api.model
def unlock_operation(self, operation, *largs, **kwargs):
locks = self.env['muk_security.lock'].sudo().search([('operation', '=', operation)])
references = [
list((k, list(map(lambda rec: rec.lock_ref_id, v))))
for k, v in itertools.groupby(
locks.sorted(key=lambda rec: rec.lock_ref_model),
lambda rec: rec.lock_ref_model)]
locks.sudo().unlink()
return references
@api.multi
def is_locked(self, *largs, **kwargs):
self.ensure_one()
lock = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
if lock.id:
return lock
return False
@api.multi
def is_locked_by(self, *largs, **kwargs):
self.ensure_one()
lock = self.env['muk_security.lock'].sudo().search(
[('lock_ref', '=', self._name + ',' + str(self.id))], limit=1)
if lock.id:
return lock.locked_by_ref
return False
@api.multi
def _checking_lock_user(self, *largs, **kwargs):
for record in self:
lock = record.is_locked()
if lock and lock.locked_by_ref and not lock.locked_by_ref.id != self.env.user.id:
raise AccessError(_("The record (%s[%s]) is locked by a user, so it can't be changes or deleted.") %
(self._name, self.id))
@api.multi
def _checking_lock(self, operation=None, *largs, **kwargs):
self._checking_lock_user()
for record in self:
lock = record.is_locked()
if lock and lock.operation and lock.operation != operation:
print(operation, lock.operation)
raise IOError
raise AccessError(_("The record (%s[%s]) is locked, so it can't be changes or deleted.") %
(self._name, self.id))
@api.multi
def user_lock(self, *largs, **kwargs):
self.ensure_one()
lock = self.is_locked()
if lock:
if lock.locked_by_ref:
raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
else:
raise AccessError(_("The record is already locked."))
return self.lock(user=self.env.user)
@api.multi
def user_unlock(self, *largs, **kwargs):
self.ensure_one()
lock = self.is_locked()
if lock:
if lock.locked_by_ref and lock.locked_by_ref.id == self.env.user.id:
self.unlock()
else:
if lock.locked_by_ref:
raise AccessError(_("The record is already locked by another user. (%s)") % lock.locked_by_ref.name)
else:
raise AccessError(_("The record is already locked."))
return True
#----------------------------------------------------------
# Read, View
#----------------------------------------------------------
@api.multi
def _compute_lock(self):
for record in self:
locked = record.is_locked()
record.update({
'locked_state': bool(locked),
'locked': locked})
@api.depends('locked')
def _compute_editor(self):
for record in self:
record.editor = record.is_locked_by() == record.env.user
#----------------------------------------------------------
# Create, Update, Delete
#----------------------------------------------------------
@api.multi
def write(self, vals):
operation = self.generate_operation_key()
vals = self._before_write_operation(vals, operation)
process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
result = super(BaseModelLocking, self.with_context(operation=process_operation)).write(vals)
for record in self:
record._after_write_record_operation(vals, operation)
result = self._after_write_operation(result, vals, operation)
return result
@api.multi
def _before_write_operation(self, vals, operation, *largs, **kwargs):
if 'operation' in self.env.context:
self._checking_lock(self.env.context['operation'])
else:
self._checking_lock(operation)
return vals
@api.multi
def _after_write_record_operation(self, vals, operation, *largs, **kwargs):
return vals
@api.multi
def _after_write_operation(self, result, vals, operation, *largs, **kwargs):
return result
@api.multi
def unlink(self):
operation = self.generate_operation_key()
self._before_unlink_operation(operation)
for record in self:
record._before_unlink_record_operation(operation)
process_operation = self.env.context['operation'] if 'operation' in self.env.context else operation
result = super(BaseModelLocking, self.with_context(operation=process_operation)).unlink()
self._after_unlink_operation(result, operation)
return result
@api.multi
def _before_unlink_operation(self, operation, *largs, **kwargs):
if 'operation' in self.env.context:
self._checking_lock(self.env.context['operation'])
else:
self._checking_lock(operation)
@api.multi
def _before_unlink_record_operation(self, operation, *largs, **kwargs):
pass
@api.multi
def _after_unlink_operation(self, result, operation, *largs, **kwargs):
pass

2
muk_security/models/res_groups.py

@ -38,4 +38,4 @@ class AccessGroups(models.Model):
relation='muk_security_groups_groups_rel', relation='muk_security_groups_groups_rel',
column1='rid', column1='rid',
column2='gid', column2='gid',
string='Groups')
string='Groups')

2
muk_security/models/res_users.py

@ -64,4 +64,4 @@ class AccessUser(models.Model):
return super(helper.NoSecurityUid, id).__int__() return super(helper.NoSecurityUid, id).__int__()
return id return id
access_ids = [convert_security_uid(id) for id in ids] access_ids = [convert_security_uid(id) for id in ids]
return super(AccessUser, cls)._browse(access_ids, *args, **kwargs)
return super(AccessUser, cls)._browse(access_ids, *args, **kwargs)

2
muk_security/tests/__init__.py

@ -19,4 +19,4 @@
# #
################################################################################### ###################################################################################
from . import test_suspend_security
from . import test_suspend_security

131
muk_security/tests/test_suspend_security.py

@ -1,50 +1,81 @@
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import os
import base64
import logging
from odoo import exceptions
from odoo.tests import common
_path = os.path.dirname(os.path.dirname(__file__))
_logger = logging.getLogger(__name__)
class SuspendSecurityTestCase(common.TransactionCase):
at_install = False
post_install = True
def setUp(self):
super(SuspendSecurityTestCase, self).setUp()
def tearDown(self):
super(SuspendSecurityTestCase, self).tearDown()
def test_suspend_security(self):
user_id = self.env.ref('base.user_demo').id
with self.assertRaises(exceptions.AccessError):
self.env.ref('base.user_root').sudo(user_id).name = 'test'
self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test'
self.assertEqual(self.env.ref('base.user_root').name, 'test')
self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id)
def test_normalize(self):
self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid)
###################################################################################
#
# Copyright (C) 2017 MuK IT GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###################################################################################
import os
import base64
import logging
from odoo import exceptions
from odoo.tests import common
_path = os.path.dirname(os.path.dirname(__file__))
_logger = logging.getLogger(__name__)
class SuspendSecurityTestCase(common.TransactionCase):
at_install = False
post_install = True
def setUp(self):
super(SuspendSecurityTestCase, self).setUp()
def tearDown(self):
super(SuspendSecurityTestCase, self).tearDown()
def test_suspend_security(self):
user_id = self.env.ref('base.user_demo').id
other_company = self.env['res.company'].create({
'name': 'other company',
# without this, a partner is created and mail's constraint on
# notify_email kicks in
'partner_id': self.env.ref('base.partner_demo').id,
})
with self.assertRaises(exceptions.AccessError):
self.env.ref('base.user_root').sudo(user_id).name = 'test'
with self.assertRaises(exceptions.AccessError):
other_company.sudo(user_id).name = 'test'
self.env.ref('base.user_root').sudo(user_id).suspend_security().name = 'test'
self.assertEqual(self.env.ref('base.user_root').name, 'test')
self.assertEqual(self.env.ref('base.user_root').write_uid.id, user_id)
# this tests ir.rule
other_company.sudo(user_id).suspend_security().write({'name': 'test'})
self.assertEqual(other_company.name, 'test')
self.assertEqual(other_company.write_uid.id, user_id)
# this tests if _normalize_args conversion works
self.env['res.users'].browse(
self.env['res.users'].suspend_security().env.uid)
# Test normal search on One2many
partner = self.env['res.partner'].search(
[('user_ids.id', '=', self.env.user.suspend_security().env.uid)],
limit=1)
self.assertEqual(partner, self.env.user.partner_id)
# Test search on One2many without specifing ID (standard Odoo)
partner = self.env['res.partner'].search(
[('user_ids', '=', self.env.uid)],
limit=1)
self.assertEqual(partner, self.env.user.partner_id)
# Test search on One2many without specifing ID (suspend_security)
partner = self.env['res.partner'].search(
[('user_ids', '=', self.env.user.suspend_security().env.uid)],
limit=1)
self.assertEqual(partner, self.env.user.partner_id)
# Test search on One2many without ID with IN (standard Odoo)
partner = self.env['res.partner'].search(
[('user_ids', 'in', self.env.uid)],
limit=1)
self.assertEqual(partner, self.env.user.partner_id)
# Test search on One2many without ID with IN (suspend_security)
partner = self.env['res.partner'].search(
[('user_ids', 'in', self.env.user.suspend_security().env.uid)],
limit=1)
self.assertEqual(partner, self.env.user.partner_id)
def test_normalize(self):
self.env['res.users'].browse(self.env['res.users'].suspend_security().env.uid)
Loading…
Cancel
Save