|
|
@ -19,9 +19,9 @@ |
|
|
|
|
|
|
|
import logging |
|
|
|
|
|
|
|
from odoo import _, SUPERUSER_ID |
|
|
|
from odoo import models, api, fields |
|
|
|
from odoo import _, models, api, fields, SUPERUSER_ID |
|
|
|
from odoo.exceptions import AccessError |
|
|
|
from odoo.osv import expression |
|
|
|
|
|
|
|
from odoo.addons.muk_security.tools.security import NoSecurityUid |
|
|
|
|
|
|
@ -31,22 +31,19 @@ class AccessGroupsModel(models.AbstractModel): |
|
|
|
|
|
|
|
_name = 'muk_security.mixins.access_groups' |
|
|
|
_description = "Group Access Mixin" |
|
|
|
_inherit = 'muk_security.mixins.access' |
|
|
|
|
|
|
|
# Set it to True to enforced security even if no group has been set |
|
|
|
_strict_security = False |
|
|
|
_inherit = 'muk_security.mixins.access_rights' |
|
|
|
|
|
|
|
# If set the group fields are restricted by the access group |
|
|
|
_field_groups = None |
|
|
|
_access_groups_fields = None |
|
|
|
|
|
|
|
# If set to True the model is extended by fields to suspend the checks |
|
|
|
_active_suspend = False |
|
|
|
|
|
|
|
# If set the suspend fields are restricted by the access group |
|
|
|
_suspend_groups = None |
|
|
|
# Set it to True to enforced security even if no group has been set |
|
|
|
_access_groups_strict = False |
|
|
|
|
|
|
|
# Set it to True to let the non strict mode check for existing groups per mode |
|
|
|
_access_groups_mode = False |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Datebase |
|
|
|
# Datebase |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.model |
|
|
@ -55,31 +52,6 @@ class AccessGroupsModel(models.AbstractModel): |
|
|
|
def add(name, field): |
|
|
|
if name not in self._fields: |
|
|
|
self._add_field(name, field) |
|
|
|
if self._active_suspend: |
|
|
|
add('suspend_security_read', fields.Boolean( |
|
|
|
_module=self._module, |
|
|
|
string="Suspend Security for Read", |
|
|
|
automatic=True, |
|
|
|
default=False, |
|
|
|
groups=self._suspend_groups)) |
|
|
|
add('suspend_security_create', fields.Boolean( |
|
|
|
_module=self._module, |
|
|
|
string="Suspend Security for Create", |
|
|
|
automatic=True, |
|
|
|
default=False, |
|
|
|
groups=self._suspend_groups)) |
|
|
|
add('suspend_security_write', fields.Boolean( |
|
|
|
_module=self._module, |
|
|
|
string="Suspend Security for Write", |
|
|
|
automatic=True, |
|
|
|
default=False, |
|
|
|
groups=self._suspend_groups)) |
|
|
|
add('suspend_security_unlink', fields.Boolean( |
|
|
|
_module=self._module, |
|
|
|
string="Suspend Security for Unlink", |
|
|
|
automatic=True, |
|
|
|
default=False, |
|
|
|
groups=self._suspend_groups)) |
|
|
|
add('groups', fields.Many2many( |
|
|
|
_module=self._module, |
|
|
|
comodel_name='muk_security.access_groups', |
|
|
@ -88,7 +60,7 @@ class AccessGroupsModel(models.AbstractModel): |
|
|
|
column2='gid', |
|
|
|
string="Groups", |
|
|
|
automatic=True, |
|
|
|
groups=self._field_groups)) |
|
|
|
groups=self._access_groups_fields)) |
|
|
|
add('complete_groups', fields.Many2many( |
|
|
|
_module=self._module, |
|
|
|
comodel_name='muk_security.access_groups', |
|
|
@ -99,177 +71,160 @@ class AccessGroupsModel(models.AbstractModel): |
|
|
|
compute='_compute_groups', |
|
|
|
store=True, |
|
|
|
automatic=True, |
|
|
|
groups=self._field_groups)) |
|
|
|
groups=self._access_groups_fields)) |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Function |
|
|
|
# Helper |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _get_no_access_ids(self): |
|
|
|
if not self._strict_security: |
|
|
|
sql = ''' |
|
|
|
SELECT id |
|
|
|
FROM %s a |
|
|
|
WHERE NOT EXISTS ( |
|
|
|
SELECT * |
|
|
|
FROM %s_complete_groups_rel r |
|
|
|
WHERE r.aid = a.id |
|
|
|
); |
|
|
|
''' % (self._table, self._table) |
|
|
|
self.env.cr.execute(sql) |
|
|
|
fetch = self.env.cr.fetchall() |
|
|
|
return len(fetch) > 0 and list(map(lambda x: x[0], fetch)) or [] |
|
|
|
else: |
|
|
|
return [] |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _get_suspended_access_ids(self, operation): |
|
|
|
if self._active_suspend: |
|
|
|
sql = ''' |
|
|
|
SELECT id |
|
|
|
FROM %s a |
|
|
|
WHERE a.suspend_security_%s = true |
|
|
|
''' % (self._table, operation) |
|
|
|
self.env.cr.execute(sql) |
|
|
|
fetch = self.env.cr.fetchall() |
|
|
|
return len(fetch) > 0 and list(map(lambda x: x[0], fetch)) or [] |
|
|
|
else: |
|
|
|
return [] |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _get_access_ids(self): |
|
|
|
model = self._name.split(".")[-1] |
|
|
|
sql = ''' |
|
|
|
SELECT r.aid |
|
|
|
FROM %s_complete_groups_rel r |
|
|
|
JOIN muk_security_access_groups g ON r.gid = g.id |
|
|
|
JOIN muk_security_access_groups_users_rel u ON r.gid = u.gid |
|
|
|
WHERE u.uid = %s AND g.perm_read = true |
|
|
|
''' % (self._table, self.env.user.id) |
|
|
|
self.env.cr.execute(sql) |
|
|
|
fetch = self.env.cr.fetchall() |
|
|
|
access_ids = len(fetch) > 0 and list(map(lambda x: x[0], fetch)) or [] |
|
|
|
return access_ids |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _get_ids_without_security(self, operation): |
|
|
|
no_access_ids = self._get_no_access_ids() |
|
|
|
suspended_access_ids = self._get_suspended_access_ids(operation) |
|
|
|
return list(set(no_access_ids).union(suspended_access_ids)) |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _get_complete_access_ids(self, operation): |
|
|
|
access_ids = self._get_access_ids() |
|
|
|
no_access_ids = self._get_no_access_ids() |
|
|
|
suspended_access_ids = self._get_suspended_access_ids(operation) |
|
|
|
return list(set(access_ids).union(no_access_ids, suspended_access_ids)) |
|
|
|
|
|
|
|
|
|
|
|
@api.multi |
|
|
|
def _eval_access_skip(self, operation): |
|
|
|
if isinstance(self.env.uid, NoSecurityUid): |
|
|
|
return True |
|
|
|
return False |
|
|
|
def _filter_access(self, operation): |
|
|
|
records = super(AccessGroupsModel, self)._filter_access(operation) |
|
|
|
return records.filter_access_groups(operation) |
|
|
|
|
|
|
|
@api.multi |
|
|
|
def check_access_groups(self, operation): |
|
|
|
if self.env.user.id == SUPERUSER_ID or self._eval_access_skip(operation): |
|
|
|
@api.model |
|
|
|
def _apply_access_groups(self, query, mode='read'): |
|
|
|
if self.env.user.id == SUPERUSER_ID or isinstance(self.env.uid, NoSecurityUid): |
|
|
|
return None |
|
|
|
filter_ids = self._get_ids_without_security(operation) |
|
|
|
for record in self.filtered(lambda rec: rec.id not in filter_ids): |
|
|
|
sql = ''' |
|
|
|
SELECT perm_%s |
|
|
|
FROM %s_complete_groups_rel r |
|
|
|
where_clause = ''' |
|
|
|
"{table}"."id" IN ( |
|
|
|
SELECT r.aid |
|
|
|
JOIN {table}_complete_groups_rel r |
|
|
|
JOIN muk_security_access_groups g ON r.gid = g.id |
|
|
|
JOIN muk_security_access_groups_users_rel u ON r.gid = u.gid |
|
|
|
WHERE r.aid = %s AND u.uid = %s |
|
|
|
''' % (operation, self._table, record.id, self.env.user.id) |
|
|
|
self.env.cr.execute(sql) |
|
|
|
fetch = self.env.cr.fetchall() |
|
|
|
if not any(list(map(lambda x: x[0], fetch))): |
|
|
|
raise AccessError(_("This operation is forbidden!")) |
|
|
|
WHERE (u.uid = %s AND g.perm_{mode} = true) |
|
|
|
) |
|
|
|
'''.format(table=self._table, mode=mode) |
|
|
|
if not self._access_groups_strict: |
|
|
|
or_clause = ''' |
|
|
|
OR NOT EXISTS ( |
|
|
|
SELECT 1 |
|
|
|
FROM {table}_complete_groups_rel sr |
|
|
|
JOIN muk_security_access_groups sg ON sr.gid = sg.id |
|
|
|
WHERE sr.aid = "{table}"."id" |
|
|
|
) |
|
|
|
'''.format(table=self._table) |
|
|
|
if self._access_groups_mode: |
|
|
|
or_clause += 'AND sg.perm_{mode} = true'.format(mode=mode) |
|
|
|
where_clause += or_clause |
|
|
|
query.where_clause += [where_clause] |
|
|
|
query.where_clause_params += [self.env.user.id] |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _apply_ir_rules(self, query, mode='read'): |
|
|
|
super(AccessGroupsModel, self)._apply_ir_rules(query, mode=mode) |
|
|
|
self._apply_access_groups(query, mode=mode) |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Function |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.multi |
|
|
|
def check_access(self, operation, raise_exception=False): |
|
|
|
res = super(BaseModelAccessGroups, self).check_access(operation, raise_exception) |
|
|
|
res = super(AccessGroupsModel, self).check_access(operation, raise_exception) |
|
|
|
try: |
|
|
|
access_groups = self.check_access_groups(operation) == None |
|
|
|
return res and access_groups |
|
|
|
return res and self.check_access_groups(operation) == None |
|
|
|
except AccessError: |
|
|
|
if raise_exception: |
|
|
|
raise |
|
|
|
return False |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Read |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.multi |
|
|
|
def _after_read(self, result, *largs, **kwargs): |
|
|
|
result = super(BaseModelAccessGroups, self)._after_read(result) |
|
|
|
if self.env.user.id == SUPERUSER_ID or self._eval_access_skip("read"): |
|
|
|
return result |
|
|
|
access_ids = self._get_complete_access_ids("read") |
|
|
|
result = [result] if not isinstance(result, list) else result |
|
|
|
if len(access_ids) > 0: |
|
|
|
access_result = [] |
|
|
|
for record in result: |
|
|
|
if record['id'] in access_ids: |
|
|
|
access_result.append(record) |
|
|
|
return access_result |
|
|
|
return [] |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _after_search(self, result, *largs, **kwargs): |
|
|
|
result = super(BaseModelAccessGroups, self)._after_search(result) |
|
|
|
if self.env.user.id == SUPERUSER_ID or self._eval_access_skip("read"): |
|
|
|
return result |
|
|
|
access_ids = self._get_complete_access_ids("read") |
|
|
|
if len(access_ids) > 0: |
|
|
|
access_result = self.env[self._name] |
|
|
|
if isinstance(result, int): |
|
|
|
if result in access_ids: |
|
|
|
return result |
|
|
|
else: |
|
|
|
for record in result: |
|
|
|
if record.id in access_ids: |
|
|
|
access_result += record |
|
|
|
return access_result |
|
|
|
return self.env[self._name] |
|
|
|
|
|
|
|
@api.model |
|
|
|
def _after_name_search(self, result, *largs, **kwargs): |
|
|
|
result = super(BaseModelAccessGroups, self)._after_name_search(result) |
|
|
|
if self.env.user.id == SUPERUSER_ID or self._eval_access_skip("read"): |
|
|
|
return result |
|
|
|
access_ids = self._get_complete_access_ids("read") |
|
|
|
if len(access_ids) > 0: |
|
|
|
access_result = [] |
|
|
|
for tuple in result: |
|
|
|
if tuple[0] in access_ids: |
|
|
|
access_result.append(tuple) |
|
|
|
return access_result |
|
|
|
return [] |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Read, View |
|
|
|
# Security |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.depends('groups') |
|
|
|
def _compute_groups(self): |
|
|
|
for record in self: |
|
|
|
record.complete_groups = record.groups |
|
|
|
@api.multi |
|
|
|
def check_access_groups(self, operation): |
|
|
|
if self.env.user.id == SUPERUSER_ID or isinstance(self.env.uid, NoSecurityUid): |
|
|
|
return None |
|
|
|
sql_query = ''' |
|
|
|
SELECT perm_{operation} |
|
|
|
FROM {table}_complete_groups_rel r |
|
|
|
JOIN muk_security_access_groups g ON r.gid = g.id |
|
|
|
JOIN muk_security_access_groups_users_rel u ON r.gid = u.gid |
|
|
|
WHERE (r.aid = ANY (VALUES {ids}) AND u.uid = %s) |
|
|
|
'''.format( |
|
|
|
operation=operation, |
|
|
|
table=self._table, |
|
|
|
ids=', '.join(map(lambda id: '(%s)' % id, self.ids)), |
|
|
|
) |
|
|
|
if not self._access_groups_strict: |
|
|
|
or_clause = ''' |
|
|
|
OR NOT EXISTS ( |
|
|
|
SELECT 1 |
|
|
|
FROM {table}_complete_groups_rel sr |
|
|
|
JOIN muk_security_access_groups sg ON sr.gid = sg.id |
|
|
|
WHERE sr.aid = ANY (VALUES {ids}) |
|
|
|
) |
|
|
|
'''.format( |
|
|
|
table=self._table, |
|
|
|
ids=', '.join(map(lambda id: '(%s)' % id, self.ids)) |
|
|
|
) |
|
|
|
if self._access_groups_mode: |
|
|
|
or_clause += 'AND sg.perm_{operation} = true'.format(operation=operation) |
|
|
|
sql_query += or_clause |
|
|
|
self.env.cr.execute(sql_query, [self.env.user.id]) |
|
|
|
result = self.env.cr.fetchall() |
|
|
|
if len(result) < self.ids or any(list(map(lambda val: val[0], result))): |
|
|
|
raise AccessError(_( |
|
|
|
'The requested operation cannot be completed due to security restrictions. ' |
|
|
|
'Please contact your system administrator.\n\n(Document type: %s, Operation: %s)' |
|
|
|
) % (self._description, operation)) |
|
|
|
|
|
|
|
|
|
|
|
@api.multi |
|
|
|
def filter_access_groups(self, operation): |
|
|
|
if self.env.user.id == SUPERUSER_ID or isinstance(self.env.uid, NoSecurityUid): |
|
|
|
return self |
|
|
|
sql_query = ''' |
|
|
|
SELECT r.aid |
|
|
|
FROM {table}_complete_groups_rel r |
|
|
|
JOIN muk_security_access_groups g ON r.gid = g.id |
|
|
|
JOIN muk_security_access_groups_users_rel u ON r.gid = u.gid |
|
|
|
WHERE (r.aid = ANY (VALUES {ids}) AND u.uid = %s AND g.perm_{operation} = true) |
|
|
|
'''.format( |
|
|
|
table=self._table, |
|
|
|
ids=', '.join(map(lambda id: '(%s)' % id, self.ids)), |
|
|
|
operation=operation, |
|
|
|
) |
|
|
|
if not self._access_groups_strict: |
|
|
|
or_clause = ''' |
|
|
|
OR NOT EXISTS ( |
|
|
|
SELECT 1 |
|
|
|
FROM {table}_complete_groups_rel sr |
|
|
|
JOIN muk_security_access_groups sg ON sr.gid = sg.id |
|
|
|
WHERE sr.aid = ANY (VALUES {ids}) |
|
|
|
) |
|
|
|
'''.format( |
|
|
|
table=self._table, |
|
|
|
ids=', '.join(map(lambda id: '(%s)' % id, self.ids)) |
|
|
|
) |
|
|
|
if self._access_groups_mode: |
|
|
|
or_clause += 'AND sg.perm_{operation} = true'.format(operation=operation) |
|
|
|
sql_query += or_clause |
|
|
|
self.env.cr.execute(sql_query, [self.env.user.id]) |
|
|
|
result = self.env.cr.fetchall() |
|
|
|
return self.browse(list(map(lambda val: val[0], result))) |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Create, Update, Delete |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.multi |
|
|
|
def _before_write(self, vals, *largs, **kwargs): |
|
|
|
def _write(self, vals): |
|
|
|
self.check_access_groups('write') |
|
|
|
return super(BaseModelAccessGroups, self)._before_write(vals, *largs, **kwargs) |
|
|
|
return super(AccessGroupsModel, self)._write(vals) |
|
|
|
|
|
|
|
@api.multi |
|
|
|
def _before_unlink(self, *largs, **kwargs): |
|
|
|
def unlink(self): |
|
|
|
self.check_access_groups('unlink') |
|
|
|
return super(BaseModelAccessGroups, self)._before_unlink(*largs, **kwargs) |
|
|
|
return super(AccessGroupsModel, self).unlink() |
|
|
|
|
|
|
|
#---------------------------------------------------------- |
|
|
|
# Groups |
|
|
|
#---------------------------------------------------------- |
|
|
|
|
|
|
|
@api.depends('groups') |
|
|
|
def _compute_groups(self): |
|
|
|
for record in self: |
|
|
|
record.complete_groups = record.groups |