diff --git a/base_mixin_restrict_field_access/README.rst b/base_mixin_restrict_field_access/README.rst new file mode 100644 index 000000000..03f02ac8c --- /dev/null +++ b/base_mixin_restrict_field_access/README.rst @@ -0,0 +1,123 @@ +.. image:: https://img.shields.io/badge/licence-AGPL--3-blue.svg + :alt: License: AGPL-3 + +===================== +Restrict field access +===================== + +This module was written to help developers restricting access to fields in a +secure and flexible manner on record level. + +If you're not a developer, this module is not for you as you need to write code +in order to actually use it. + +Usage +===== + +To use this module, you need to inherit this mixin for the model whose fields +you want to restrict, and implement at least the following methods to do +something useful: + +.. code:: python + + class ResPartner(models.Model): + # inherit from the mixin + _inherit = ['restrict.field.access.mixin', 'res.partner'] + _name = 'res.partner' + + @api.multi + def _restrict_field_access_get_field_whitelist(self, action='read'): + # return a whitelist (or a blacklist) of fields, depending on the + # action passed + whitelist = [ + 'name', 'parent_id', 'is_company', 'firstname', 'lastname', + 'infix', 'initials', + ] + super(ResPartner, self)\ + ._restrict_field_access_get_field_whitelist(action=action) + if action == 'read': + whitelist.extend(['section_id', 'user_id']) + return whitelist + + @api.multi + def _restrict_field_access_is_field_accessible(self, field_name, + action='read'): + # in case the whitelist is not enough, you can also decide for + # specific records if an action can be carried out on it or not + result = super(ResPartner, self)\ + ._restrict_field_access_is_field_accessible( + field_name, action=action) + if result or not self: + return result + return all(this.section_id in self.env.user.section_ids or + this.user_id == self.env.user + for this in self) + + @api.multi + @api.onchange('section_id', 'user_id') + @api.depends('section_id', 'user_id') + def _compute_restrict_field_access(self): + # if your decision depends on other fields, you probably need to + # override this function in order to attach the correct onchange/ + # depends decorators + return super(ResPartner, self)._compute_restrict_field_access() + + @api.model + def _restrict_field_access_inject_restrict_field_access_domain( + self, domain): + # you also might want to decide with a domain expression which + # records are visible in the first place + domain[:] = expression.AND([ + domain, + [ + '|', + ('section_id', 'in', self.env.user.section_ids.ids), + ('user_id', '=', self.env.user.id), + ], + ]) + +The example code here will allow only reading a few fields for partners of +which the current user is neither the sales person nor in this partner's sales +team. + +Read the comments of the mixin, that's part of the documentation. Also have a +look at the tests, that's another example on how to use this code. + +For further information, please visit: + +* https://www.odoo.com/forum/help-1 + +Known issues / Roadmap +====================== + +* the code contains some TODOs which should be done + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us smashing it by providing a detailed and welcomed feedback +`here `_. + +Credits +======= + +Contributors +------------ + +* Holger Brunn + +Maintainer +---------- + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +This module is maintained by the OCA. + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +To contribute to this module, please visit https://odoo-community.org. diff --git a/base_mixin_restrict_field_access/__init__.py b/base_mixin_restrict_field_access/__init__.py new file mode 100644 index 000000000..0188fced6 --- /dev/null +++ b/base_mixin_restrict_field_access/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from . import models +from . import controllers diff --git a/base_mixin_restrict_field_access/__openerp__.py b/base_mixin_restrict_field_access/__openerp__.py new file mode 100644 index 000000000..ce5859e0c --- /dev/null +++ b/base_mixin_restrict_field_access/__openerp__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +{ + "name": "Restrict field access", + "version": "8.0.1.0.0", + "author": "Therp BV,Odoo Community Association (OCA)", + "license": "AGPL-3", + "category": "Hidden/Dependency", + "summary": "Make it simple to restrict read and/or write access to " + "certain fields base on some condition", + "depends": [ + 'web', + 'base_suspend_security', + ], +} diff --git a/base_mixin_restrict_field_access/controllers/__init__.py b/base_mixin_restrict_field_access/controllers/__init__.py new file mode 100644 index 000000000..1b21ddd33 --- /dev/null +++ b/base_mixin_restrict_field_access/controllers/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# © 2017 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from . import main diff --git a/base_mixin_restrict_field_access/controllers/main.py b/base_mixin_restrict_field_access/controllers/main.py new file mode 100644 index 000000000..f2632e1bf --- /dev/null +++ b/base_mixin_restrict_field_access/controllers/main.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# © 2017 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from openerp.http import request +from openerp.addons.web.controllers.main import Export +from ..models.restrict_field_access_mixin import RestrictFieldAccessMixin + + +class RestrictedExport(Export): + """Don't (even offer to) export inaccessible fields""" + def fields_get(self, model): + fields = super(RestrictedExport, self).fields_get(model) + model = request.env[model] + if isinstance(model, RestrictFieldAccessMixin): + sanitised_fields = { + k: fields[k] for k in fields + if model._restrict_field_access_is_field_accessible(k) + } + return sanitised_fields + else: + return fields diff --git a/base_mixin_restrict_field_access/models/__init__.py b/base_mixin_restrict_field_access/models/__init__.py new file mode 100644 index 000000000..a2f49deff --- /dev/null +++ b/base_mixin_restrict_field_access/models/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from . import restrict_field_access_mixin diff --git a/base_mixin_restrict_field_access/models/restrict_field_access_mixin.py b/base_mixin_restrict_field_access/models/restrict_field_access_mixin.py new file mode 100644 index 000000000..49eb08355 --- /dev/null +++ b/base_mixin_restrict_field_access/models/restrict_field_access_mixin.py @@ -0,0 +1,312 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +import json +from lxml import etree +from openerp import _, api, fields, models, SUPERUSER_ID +from openerp.osv import expression # pylint: disable=W0402 +from openerp.addons.base_suspend_security.base_suspend_security import\ + BaseSuspendSecurityUid + + +class RestrictFieldAccessMixin(models.AbstractModel): + """Mixin to restrict access to fields on record level""" + _name = 'restrict.field.access.mixin' + + @api.multi + def _compute_restrict_field_access(self): + """determine if restricted field access is active on records. + If you override _restrict_field_access_is_field_accessible to make + fields accessible depending on some other field values, override this + to in order to append an @api.depends that reflects this""" + result = {} + for this in self: + this['restrict_field_access'] = any( + not this._restrict_field_access_is_field_accessible( + field, 'write') + for field in self._fields) + if this['restrict_field_access']: + result['warning'] = { + 'title': _('Warning'), + 'message': _( + 'You will lose access to fields if you save now!'), + } + return result + + # use this field on your forms to be able to hide gui elements + restrict_field_access = fields.Boolean( + 'Field access restricted', compute='_compute_restrict_field_access') + + @api.model + @api.returns('self', lambda x: x.id) + def create(self, vals): + restricted_vals = self._restrict_field_access_filter_vals( + vals, action='create') + return self.browse( + super(RestrictFieldAccessMixin, + # TODO: this allows users to slip in nonallowed + # fields with x2many operations, so we need to reset + # this somewhere, probably just at the beginning of create + self._restrict_field_access_suspend()) + .create(restricted_vals).ids + ) + + @api.multi + def copy(self, default=None): + restricted_default = self._restrict_field_access_filter_vals( + default or {}, action='create') + return self.browse( + super(RestrictFieldAccessMixin, + self._restrict_field_access_suspend()) + .copy(default=restricted_default).ids + ) + + @api.multi + def read(self, fields=None, load='_classic_read'): + result = super(RestrictFieldAccessMixin, self).read( + fields=fields, load=load) + for record in result: + this = self.browse(record['id']) + for field in record: + if not this._restrict_field_access_is_field_accessible(field): + record[field] = self._fields[field].convert_to_read( + self._fields[field].null(self.env)) + if self._fields[field] in self.env.cache: + self.env.cache[self._fields[field]].pop( + record['id'], False) + return result + + @api.model + def read_group(self, domain, fields, groupby, offset=0, limit=None, + orderby=False, lazy=True): + """Restrict reading if we read an inaccessible field""" + has_inaccessible_field = False + has_inaccessible_field |= any( + not self._restrict_field_access_is_field_accessible(f) + for f in fields or self._fields.keys() + ) + has_inaccessible_field |= any( + expression.is_leaf(term) and + not self._restrict_field_access_is_field_accessible( + term[0].split('.')[0] + ) + for term in domain + ) + if groupby: + if isinstance(groupby, basestring): + groupby = [groupby] + has_inaccessible_field |= any( + not self._restrict_field_access_is_field_accessible( + f.split(':')[0] + ) + for f in groupby + ) + if orderby: + has_inaccessible_field |= any( + not self._restrict_field_access_is_field_accessible(f.split()) + for f in orderby.split(',') + ) + # just like with search, we restrict read_group to the accessible + # records, because we'd either leak data otherwise or have very wrong + # results + if has_inaccessible_field: + self._restrict_field_access_inject_restrict_field_access_domain( + domain + ) + + return super(RestrictFieldAccessMixin, self).read_group( + domain, fields, groupby, offset=offset, limit=limit, + orderby=orderby, lazy=lazy + ) + + @api.multi + def _BaseModel__export_rows(self, fields): + """Null inaccessible fields""" + result = [] + for this in self: + rows = super(RestrictFieldAccessMixin, this)\ + ._BaseModel__export_rows(fields) + for row in rows: + for i, path in enumerate(fields): + # we only need to take care of our own fields, super calls + # __export_rows again for x2x exports + if not path or len(path) > 1: + continue + if not this._restrict_field_access_is_field_accessible( + path[0], + ) and row[i]: + field = self._fields[path[0]] + row[i] = field.convert_to_export( + field.convert_to_cache( + field.null(self.env), this, validate=False, + ), + self.env + ) + result.extend(rows) + return result + + @api.multi + def write(self, vals): + for this in self: + # this way, we get the minimal values we can write on all records + vals = this._restrict_field_access_filter_vals( + vals, action='write') + return super(RestrictFieldAccessMixin, self).write(vals) + + @api.model + def _search(self, args, offset=0, limit=None, order=None, count=False, + access_rights_uid=None): + if not args: + return super(RestrictFieldAccessMixin, self)._search( + args, offset=offset, limit=limit, order=order, count=count, + access_rights_uid=access_rights_uid) + args = expression.normalize_domain(args) + has_inaccessible_field = False + for term in args: + if not expression.is_leaf(term): + continue + if not self._restrict_field_access_is_field_accessible( + term[0], 'read'): + has_inaccessible_field = True + break + if has_inaccessible_field: + check_self = self if not access_rights_uid else self.sudo( + access_rights_uid) + check_self\ + ._restrict_field_access_inject_restrict_field_access_domain( + args) + return super(RestrictFieldAccessMixin, self)._search( + args, offset=offset, limit=limit, order=order, count=count, + access_rights_uid=access_rights_uid) + + @api.model + def _restrict_field_access_inject_restrict_field_access_domain( + self, domain): + """inject a proposition to restrict search results to only the ones + where the user may access all fields in the search domain. If you + you override _restrict_field_access_is_field_accessible to make + fields accessible depending on some other field values, override this + in order not to leak information""" + pass + + @api.cr_uid_context + def fields_view_get(self, cr, uid, view_id=None, view_type='form', + context=None, toolbar=False, submenu=False): + # pylint: disable=R8110 + # This needs to be oldstyle because res.partner in base passes context + # as positional argument + result = super(RestrictFieldAccessMixin, self).fields_view_get( + cr, uid, view_id=view_id, view_type=view_type, context=context, + toolbar=toolbar, submenu=submenu) + + # TODO: for editable trees, we'll have to inject this into the + # form the editable list view creates on the fly + if view_type != 'form': + return result + + # inject modifiers to make forbidden fields readonly + arch = etree.fromstring(result['arch']) + for field in arch.xpath('//field'): + field.attrib['modifiers'] = json.dumps( + self._restrict_field_access_adjust_field_modifiers( + cr, uid, + field, + json.loads(field.attrib.get('modifiers', '{}')), + context=context)) + + self._restrict_field_access_inject_restrict_field_access_arch( + cr, uid, arch, result['fields'], context=context) + + result['arch'] = etree.tostring(arch, encoding="utf-8") + return result + + @api.model + def _restrict_field_access_inject_restrict_field_access_arch( + self, arch, fields): + """inject the field restrict_field_access into arch if not there""" + if 'restrict_field_access' not in fields: + etree.SubElement(arch, 'field', { + 'name': 'restrict_field_access', + 'modifiers': json.dumps({ + ('tree_' if arch.tag == 'tree' else '') + 'invisible': True + }), + }) + fields['restrict_field_access'] =\ + self._fields['restrict_field_access'].get_description(self.env) + + @api.model + def _restrict_field_access_adjust_field_modifiers(self, field_node, + modifiers): + """inject a readonly modifier to make non-writable fields in a form + readonly""" + # TODO: this can be fooled by embedded views + if not self._restrict_field_access_is_field_accessible( + field_node.attrib['name'], action='write'): + for modifier, value in [('readonly', True), ('required', False)]: + domain = modifiers.get(modifier, []) + if isinstance(domain, list) and domain: + domain = expression.normalize_domain(domain) + elif bool(domain) == value: + # readonly/nonrequired anyways + return modifiers + else: + domain = [] + restrict_domain = [('restrict_field_access', '=', value)] + if domain: + restrict_domain = expression.OR([ + restrict_domain, + domain + ]) + modifiers[modifier] = restrict_domain + return modifiers + + @api.multi + def _restrict_field_access_get_field_whitelist(self, action='read'): + """return whitelisted fields. Those are readable and writable for + everyone, for the rest, it depends on your implementation of + _restrict_field_access_is_field_accessible""" + return models.MAGIC_COLUMNS + [ + self._rec_name, 'display_name', 'restrict_field_access', + ] + + @api.model + def _restrict_field_access_suspend(self): + """set a marker that we don't want to restrict field access""" + return self.suspend_security() + + @api.model + def _restrict_field_access_get_is_suspended(self): + """return True if we shouldn't check for field access restrictions""" + return isinstance(self.env.uid, BaseSuspendSecurityUid) + + @api.multi + def _restrict_field_access_filter_vals(self, vals, action='read'): + """remove inaccessible fields from vals""" + assert len(self) <= 1, 'This function needs an empty recordset or '\ + 'exactly one record' + this = self.new(dict((self.copy_data()[0] if self else {}), **vals)) + return dict( + filter( + lambda itemtuple: + this._restrict_field_access_is_field_accessible( + itemtuple[0], action=action), + vals.iteritems())) + + @api.multi + def _restrict_field_access_is_field_accessible(self, field_name, + action='read'): + """return True if the current user can perform specified action on + all records in self. Override for your own logic. + This function is also called with an empty recordset to get a list + of fields which are accessible unconditionally. + Note that this function is called *very* often. Even small things + like saying self.env.user.id instead of self.env.uid will give you a + massive performance penalty""" + if self._restrict_field_access_get_is_suspended() or\ + self.env.uid == SUPERUSER_ID or\ + not self and action == 'read' and\ + self._fields[field_name].required: + return True + whitelist = self._restrict_field_access_get_field_whitelist( + action=action) + return field_name in whitelist diff --git a/base_mixin_restrict_field_access/static/description/icon.png b/base_mixin_restrict_field_access/static/description/icon.png new file mode 100644 index 000000000..3a0328b51 Binary files /dev/null and b/base_mixin_restrict_field_access/static/description/icon.png differ diff --git a/base_mixin_restrict_field_access/tests/__init__.py b/base_mixin_restrict_field_access/tests/__init__.py new file mode 100644 index 000000000..f8cbe58b7 --- /dev/null +++ b/base_mixin_restrict_field_access/tests/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from . import test_base_mixin_restrict_field_access diff --git a/base_mixin_restrict_field_access/tests/test_base_mixin_restrict_field_access.py b/base_mixin_restrict_field_access/tests/test_base_mixin_restrict_field_access.py new file mode 100644 index 000000000..abef5b343 --- /dev/null +++ b/base_mixin_restrict_field_access/tests/test_base_mixin_restrict_field_access.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from openerp import api, models +from openerp.osv import expression +from openerp.tests.common import TransactionCase + + +class TestBaseMixinRestrictFieldAccess(TransactionCase): + def test_base_mixin_restrict_field_access(self): + # inherit from our mixin. Here we want to restrict access to + # all fields when the partner has a credit limit of less than 42 + # and the current user is not an admin + class ResPartner(models.Model): + _inherit = ['restrict.field.access.mixin', 'res.partner'] + _name = 'res.partner' + + # implement a record specific whitelist: credit limit is only + # visible for normal users if it's below 42 + @api.multi + def _restrict_field_access_is_field_accessible( + self, field_name, action='read' + ): + result = super(ResPartner, self)\ + ._restrict_field_access_is_field_accessible( + field_name, action=action + ) + if not self._restrict_field_access_get_is_suspended() and\ + not self.env.user.has_group('base.group_system') and\ + field_name not in models.MAGIC_COLUMNS and self: + result = all( + this.sudo().credit_limit < 42 for this in self + ) + return result + + # and as the documentation says, we need to add a domain to enforce + # this + def _restrict_field_access_inject_restrict_field_access_domain( + self, domain + ): + domain[:] = expression.AND([ + expression.normalize_domain(domain), + [('credit_limit', '<', 42)] + ]) + # call base-suspend_security's register hook + self.env['ir.rule']._register_hook() + + # setup the model + res_partner = ResPartner._build_model(self.registry, self.cr).browse( + self.cr, self.uid, [], context={}) + res_partner._prepare_setup() + res_partner._setup_base(False) + res_partner._setup_fields() + res_partner._setup_complete() + + # run tests as nonprivileged user + partner_model = res_partner.sudo(self.env.ref('base.user_demo').id) + partner = partner_model.create({ + 'name': 'testpartner', + }) + self.assertFalse(partner.restrict_field_access) + partner.sudo().write({'credit_limit': 42}) + partner.invalidate_cache() + self.assertTrue(partner.restrict_field_access) + self.assertFalse(partner.credit_limit) + self.assertTrue(partner.sudo().credit_limit) + # not searching for some restricted field should yield the partner + self.assertIn(partner, partner_model.search([])) + # but searching for it should not + self.assertNotIn( + partner, + partner_model.search([ + ('credit_limit', '=', 42) + ]) + ) + # when we copy stuff, restricted fields should be copied, but still + # be inaccessible + new_partner = partner.copy() + self.assertFalse(new_partner.credit_limit) + self.assertTrue(new_partner.sudo().credit_limit) + # check that our field injection works + fields_view_get = partner.fields_view_get() + self.assertIn('restrict_field_access', fields_view_get['arch']) + # check that the export does null offending values + export = partner._BaseModel__export_rows([['id'], ['credit_limit']]) + self.assertEqual(export[0][1], '0.0') + # but that it does export the value when it's fine + partner.sudo().write({'credit_limit': 41}) + partner.invalidate_cache() + export = partner._BaseModel__export_rows([['id'], ['credit_limit']]) + self.assertEqual(export[0][1], '41.0') + # read_group should behave like search: restrict to records with our + # field accessible if a restricted field is requested, unrestricted + # otherwise + data = partner_model.read_group( + [], [], ['user_id'] + ) + self.assertEqual(data[0]['credit_limit'], 41) + # but users with permissions should see the sum for all records + data = partner_model.sudo().read_group( + [], [], ['user_id'] + ) + self.assertEqual( + data[0]['credit_limit'], + sum(partner_model.sudo().search([]).mapped('credit_limit')) + )