diff --git a/base_exception/__manifest__.py b/base_exception/__manifest__.py index ad9c55106..cdb70ccfa 100644 --- a/base_exception/__manifest__.py +++ b/base_exception/__manifest__.py @@ -4,7 +4,7 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). { 'name': 'Exception Rule', - 'version': '12.0.1.0.0', + 'version': '12.0.2.0.0', 'category': 'Generic Modules', 'summary': """ This module provide an abstract model to manage customizable diff --git a/base_exception/models/base_exception.py b/base_exception/models/base_exception.py index 7f6ea10b1..114c3adc8 100644 --- a/base_exception/models/base_exception.py +++ b/base_exception/models/base_exception.py @@ -4,22 +4,10 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). import time -from functools import wraps from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError from odoo.tools.safe_eval import safe_eval - - -def implemented_by_base_exception(func): - """Call a prefixed function based on 'namespace'.""" - @wraps(func) - def wrapper(cls, *args, **kwargs): - fun_name = func.__name__ - fun = '_%s%s' % (cls.rule_group, fun_name) - if not hasattr(cls, fun): - fun = '_default%s' % (fun_name) - return getattr(cls, fun)(*args, **kwargs) - return wrapper +from odoo import osv class ExceptionRule(models.Model): @@ -33,13 +21,6 @@ class ExceptionRule(models.Model): string='Sequence', help="Gives the sequence order when applying the test", ) - rule_group = fields.Selection( - selection=[], - help="Rule group is used to group the rules that must validated " - "at same time for a target object. Ex: " - "validate sale.order.line rules with sale order rules.", - required=True, - ) model = fields.Selection(selection=[], string='Apply on', required=True) exception_type = fields.Selection( @@ -52,40 +33,22 @@ class ExceptionRule(models.Model): " are evaluated with several records") domain = fields.Char('Domain') - active = fields.Boolean('Active') - next_state = fields.Char( - 'Next state', - help="If we detect exception we set the state of object (ex purchase) " - "to the next_state (ex 'to approve'). If there are more than one " - "exception detected and all have a value for next_state, we use" - "the exception having the smallest sequence value", - ) + active = fields.Boolean('Active', default=True) code = fields.Text( 'Python Code', help="Python code executed to check if the exception apply or " "not. Use failed = True to block the exception", ) - @api.constrains('next_state') - def _check_next_state_value(self): - """ Ensure that the next_state value is in the state values of - destination model """ + @api.constrains('exception_type', 'domain', 'code') + def check_exception_type_consistency(self): for rule in self: - if rule.next_state: - select_vals = self.env[ - rule.model].fields_get()[ - 'state']['selection'] - select_vals_code = [s[0] for s in select_vals] - if rule.next_state not in select_vals_code: - raise ValidationError(_( - 'The value "%s" you choose for the "next state" ' - 'field state of "%s" is wrong.' - ' Value must be in this list %s' - ) % ( - rule.next_state, - rule.model, - select_vals - )) + if ((rule.exception_type == 'by_py_code' and not rule.code) or + (rule.exception_type == 'by_domain' and not rule.domain)): + raise ValidationError( + _("There is a problem of configuration, python code or " + "domain is missing to match the exception type.") + ) @api.multi def _get_domain(self): @@ -94,7 +57,118 @@ class ExceptionRule(models.Model): return safe_eval(self.domain) +class BaseExceptionMethod(models.AbstractModel): + _name = 'base.exception.method' + + @api.multi + def _reverse_field(self): + raise NotImplementedError() + + def _rule_domain(self): + """Filter exception.rules. + By default, only the rules with the correct model + will be used. + """ + return [('model', '=', self._name)] + + @api.multi + def detect_exceptions(self): + """List all exception_ids applied on self + Exception ids are also written on records + If self is empty, check exceptions on all records. + """ + rules = self.env['exception.rule'].sudo().search( + self._rule_domain()) + all_exception_ids = [] + for rule in rules: + records_with_exception = self._detect_exceptions(rule) + reverse_field = self._reverse_field() + if self: + commons = self and rule[reverse_field] + to_remove = commons - records_with_exception + to_add = records_with_exception - commons + to_remove_list = [(3, x.id, _) for x in to_remove] + to_add_list = [(4, x.id, _) for x in to_add] + rule.write({reverse_field: to_remove_list + to_add_list}) + else: + rule.write({ + reverse_field: [(6, 0, records_with_exception.ids)] + }) + if records_with_exception: + all_exception_ids.append(rule.id) + return all_exception_ids + + @api.model + def _exception_rule_eval_context(self, rec): + return { + 'time': time, + 'self': rec, + # object, obj: deprecated. + # should be removed in future migrations + 'object': rec, + 'obj': rec, + # copy context to prevent side-effects of eval + # should be deprecated too, accesible through self. + 'context': self.env.context.copy() + } + + @api.model + def _rule_eval(self, rule, rec): + expr = rule.code + space = self._exception_rule_eval_context(rec) + try: + safe_eval(expr, + space, + mode='exec', + nocopy=True) # nocopy allows to return 'result' + except Exception as e: + raise UserError( + _('Error when evaluating the exception.rule ' + 'rule:\n %s \n(%s)') % (rule.name, e)) + return space.get('failed', False) + + @api.multi + def _detect_exceptions(self, rule): + if rule.exception_type == 'by_py_code': + return self._detect_exceptions_by_py_code(rule) + elif rule.exception_type == 'by_domain': + return self._detect_exceptions_by_domain(rule) + + @api.multi + def _get_base_domain(self): + domain = [('ignore_exception', '=', False)] + if self: + domain = osv.expression.AND([domain, [('id', 'in', self.ids)]]) + return domain + + @api.multi + def _detect_exceptions_by_py_code(self, rule): + """ + Find exceptions found on self. + If self is empty, check on all records. + """ + domain = self._get_base_domain() + records = self.search(domain) + records_with_exception = self.env[self._name] + for record in records: + if self._rule_eval(rule, record): + records_with_exception |= record + return records_with_exception + + @api.multi + def _detect_exceptions_by_domain(self, rule): + """ + Find exceptions found on self. + If self is empty, check on all records. + """ + base_domain = self._get_base_domain() + rule_domain = rule._get_domain() + domain = osv.expression.AND([base_domain, rule_domain]) + return self.search(domain) + + class BaseException(models.AbstractModel): + _inherit = 'base.exception.method' _name = 'base.exception' _order = 'main_exception_id asc' _description = 'Exception' @@ -105,7 +179,6 @@ class BaseException(models.AbstractModel): string='Main Exception', store=True, ) - rule_group = fields.Selection([], readonly=True) exception_ids = fields.Many2many('exception.rule', string='Exceptions') ignore_exception = fields.Boolean('Ignore Exceptions', copy=False) @@ -149,214 +222,3 @@ class BaseException(models.AbstractModel): if exception_ids: exceptions = self.env['exception.rule'].browse(exception_ids) raise ValidationError('\n'.join(exceptions.mapped('name'))) - - @api.multi - def test_exceptions(self): - """ - Condition method for the workflow from draft to confirm - """ - if self.detect_exceptions(): - return False - return True - - @api.multi - def _reverse_field(self): - """Name of the many2many field from exception rule to self. - - In order to take advantage of domain optimisation, exception rule - model should have a many2many field to inherited object. - The opposit relation already exists in the name of exception_ids - - Example: - class ExceptionRule(models.Model): - _inherit = 'exception.rule' - - model = fields.Selection( - selection_add=[ - ('sale.order', 'Sale order'), - [...] - ]) - sale_ids = fields.Many2many( - 'sale.order', - string='Sales') - [...] - """ - exception_obj = self.env['exception.rule'] - reverse_fields = self.env['ir.model.fields'].search([ - ['model', '=', 'exception.rule'], - ['ttype', '=', 'many2many'], - ['relation', '=', self[0]._name], - ]) - # ir.model.fields may contain old variable name - # so we check if the field exists on exception rule - return ([ - field.name for field in reverse_fields - if hasattr(exception_obj, field.name) - ] or [None])[0] - - @api.multi - def _rule_domain(self): - """Filter exception.rules. - By default, only the rules with the correct rule group - will be used. - """ - return [('rule_group', 'in', self.mapped('rule_group'))] - - @api.multi - def detect_exceptions(self): - """List all exception_ids applied on self - Exception ids are also written on records - """ - if not self: - return [] - exception_obj = self.env['exception.rule'] - all_exceptions = exception_obj.sudo().search( - self._rule_domain()) - model_exceptions = all_exceptions.filtered( - lambda ex: ex.model == self._name) - sub_exceptions = all_exceptions.filtered( - lambda ex: ex.model != self._name) - - reverse_field = self._reverse_field() - if reverse_field: - optimize = True - else: - optimize = False - - exception_by_rec, exception_by_rule = self._detect_exceptions( - model_exceptions, sub_exceptions, optimize) - - all_exception_ids = [] - for obj, exception_ids in exception_by_rec.items(): - obj.exception_ids = [(6, 0, exception_ids)] - all_exception_ids += exception_ids - for rule, exception_ids in exception_by_rule.items(): - rule[reverse_field] = [(6, 0, exception_ids.ids)] - if exception_ids: - all_exception_ids += [rule.id] - return list(set(all_exception_ids)) - - @api.model - def _exception_rule_eval_context(self, obj_name, rec): - return { - 'time': time, - 'self': rec, - # obj_name, object, obj: deprecated. - # should be removed in future migrations - obj_name: rec, - 'object': rec, - 'obj': rec, - # copy context to prevent side-effects of eval - # should be deprecated too, accesible through self. - 'context': self.env.context.copy() - } - - @api.model - def _rule_eval(self, rule, obj_name, rec): - eval_ctx = self._exception_rule_eval_context(obj_name, rec) - try: - safe_eval(rule.code, eval_ctx, mode='exec', nocopy=True) - except Exception as e: - raise UserError(_( - 'Error when evaluating the exception.rule: ' - '%s\n(%s)') % (rule.name, e)) - return eval_ctx.get('failed', False) - - @api.multi - def _detect_exceptions( - self, model_exceptions, sub_exceptions, - optimize=False, - ): - """Find exceptions found on self. - - @returns - exception_by_rec: {record_id: exception_ids} - exception_by_rule: {rule_id: record_ids} - """ - exception_by_rec = {} - exception_by_rule = {} - exception_set = set() - python_rules = [] - dom_rules = [] - optim_rules = [] - - for rule in model_exceptions: - if rule.exception_type == 'by_py_code': - python_rules.append(rule) - elif rule.exception_type == 'by_domain' and rule.domain: - if optimize: - optim_rules.append(rule) - else: - dom_rules.append(rule) - - for rule in optim_rules: - domain = rule._get_domain() - domain.append(['ignore_exception', '=', False]) - domain.append(['id', 'in', self.ids]) - records_with_exception = self.search(domain) - exception_by_rule[rule] = records_with_exception - if records_with_exception: - exception_set.add(rule.id) - - if len(python_rules) or len(dom_rules) or sub_exceptions: - for rec in self: - for rule in python_rules: - if ( - not rec.ignore_exception and - self._rule_eval(rule, rec.rule_group, rec) - ): - exception_by_rec.setdefault(rec, []).append(rule.id) - exception_set.add(rule.id) - for rule in dom_rules: - # there is no reverse many2many, so this rule - # can't be optimized, see _reverse_field - domain = rule._get_domain() - domain.append(['ignore_exception', '=', False]) - domain.append(['id', '=', rec.id]) - if self.search_count(domain): - exception_by_rec.setdefault( - rec, []).append(rule.id) - exception_set.add(rule.id) - if sub_exceptions: - group_line = rec.rule_group + '_line' - for obj_line in rec._get_lines(): - for rule in sub_exceptions: - if rule.id in exception_set: - # we do not matter if the exception as - # already been - # found for an line of this object - # (ex sale order line if obj is sale order) - continue - if rule.exception_type == 'by_py_code': - if self._rule_eval( - rule, group_line, obj_line - ): - exception_by_rec.setdefault( - rec, []).append(rule.id) - elif ( - rule.exception_type == 'by_domain' and - rule.domain - ): - # sub_exception are currently not optimizable - domain = rule._get_domain() - domain.append(('id', '=', obj_line.id)) - if obj_line.search_count(domain): - exception_by_rec.setdefault( - rec, []).append(rule.id) - - # set object to next state - # find exception that raised error and has next_state - next_state_exception_ids = model_exceptions.filtered( - lambda r: r.id in exception_set and r.next_state) - - if next_state_exception_ids: - self.state = next_state_exception_ids[0].next_state - - return exception_by_rec, exception_by_rule - - @implemented_by_base_exception - def _get_lines(self): - pass - - def _default_get_lines(self): - return [] diff --git a/base_exception/tests/purchase_test.py b/base_exception/tests/purchase_test.py index 715b200bc..5c2c7ad11 100644 --- a/base_exception/tests/purchase_test.py +++ b/base_exception/tests/purchase_test.py @@ -8,10 +8,6 @@ class PurchaseTest(models.Model): _name = "base.exception.test.purchase" _description = "Base Ecxeption Test Model" - rule_group = fields.Selection( - selection_add=[('test_base', 'test')], - default='test_base', - ) name = fields.Char(required=True) user_id = fields.Many2one('res.users', string='Responsible') state = fields.Selection( @@ -57,9 +53,9 @@ class PurchaseTest(models.Model): def button_cancel(cls): cls.write({'state': 'cancel'}) - def test_base_get_lines(cls): - cls.ensure_one() - return cls.line_ids + @api.multi + def _reverse_field(self): + return 'test_purchase_ids' class LineTest(models.Model): diff --git a/base_exception/tests/test_base_exception.py b/base_exception/tests/test_base_exception.py index 90fe6a9bb..6ab115b4f 100644 --- a/base_exception/tests/test_base_exception.py +++ b/base_exception/tests/test_base_exception.py @@ -2,6 +2,8 @@ # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl.html). from odoo.tests import common +from odoo.exceptions import ValidationError +from odoo import fields from .common import setup_test_model from .purchase_test import PurchaseTest, LineTest import logging @@ -20,11 +22,10 @@ class TestBaseException(common.SavepointCase): cls.base_exception = cls.env['base.exception'] cls.exception_rule = cls.env['exception.rule'] + if 'test_purchase_ids' not in cls.exception_rule._fields: + field = fields.Many2many('base.exception.test.purchase') + cls.exception_rule._add_field('test_purchase_ids', field) cls.exception_confirm = cls.env['exception.rule.confirm'] - - cls.exception_rule._fields['rule_group'].selection.append( - ('test_base', 'Test Base Exception')) - cls.exception_rule._fields['model'].selection.append( ('base.exception.test.purchase', 'Purchase Order')) @@ -34,25 +35,22 @@ class TestBaseException(common.SavepointCase): cls.exceptionnozip = cls.env['exception.rule'].create({ 'name': "No ZIP code on destination", 'sequence': 10, - 'rule_group': "test_base", 'model': "base.exception.test.purchase", - 'code': "if not test_base.partner_id.zip: failed=True", + 'code': "if not obj.partner_id.zip: failed=True", }) cls.exceptionno_minorder = cls.env['exception.rule'].create({ 'name': "Min order except", 'sequence': 10, - 'rule_group': "test_base", 'model': "base.exception.test.purchase", - 'code': "if test_base.amount_total <= 200.0: failed=True", + 'code': "if obj.amount_total <= 200.0: failed=True", }) cls.exceptionno_lineqty = cls.env['exception.rule'].create({ 'name': "Qty > 0", 'sequence': 10, - 'rule_group': "test_base", 'model': "base.exception.test.purchase.line", - 'code': "if test_base_line.qty <= 0: failed=True" + 'code': "if obj.qty <= 0: failed=True" }) def test_purchase_order_exception(self): @@ -67,19 +65,10 @@ class TestBaseException(common.SavepointCase): 'qty': 1.5, })], }) - - potest1.button_confirm() - # Set ignore_exception flag (Done after ignore is selected at wizard) + # Block because of exception during validation + with self.assertRaises(ValidationError): + potest1.button_confirm() + # Test ignore exeception make possible for the po to validate potest1.ignore_exception = True potest1.button_confirm() self.assertTrue(potest1.state == 'purchase') - # Simulation the opening of the wizard exception_confirm and - # set ignore_exception to True - except_confirm = self.exception_confirm.with_context( - { - 'active_id': potest1.id, - 'active_ids': [potest1.id], - 'active_model': potest1._name - }).new({'ignore': True}) - except_confirm.action_confirm() - self.assertTrue(potest1.ignore_exception) diff --git a/base_exception/views/base_exception_view.xml b/base_exception/views/base_exception_view.xml index 3c6a7b0db..fde377c1f 100644 --- a/base_exception/views/base_exception_view.xml +++ b/base_exception/views/base_exception_view.xml @@ -37,15 +37,13 @@ - - - + - +