# Copyright 2020 Creu Blanca # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import ast import json import re from datetime import date, datetime, time from dateutil import relativedelta from odoo import _, api, fields, models from odoo.exceptions import ValidationError from odoo.tools.float_utils import float_compare from odoo.tools.safe_eval import safe_eval from odoo.addons.base.models.ir_cron import _intervalTypes class KpiKpi(models.Model): _name = "kpi.kpi" _description = "Kpi Kpi" name = fields.Char(required=True) active = fields.Boolean(default=True) cron_id = fields.Many2one("ir.cron", readonly=True, copy=False) computation_method = fields.Selection( [("function", "Function"), ("code", "Code")], required=True, default="code", ) value = fields.Serialized() dashboard_item_ids = fields.One2many("kpi.dashboard.item", inverse_name="kpi_id") model_id = fields.Many2one( "ir.model", ) function = fields.Char() args = fields.Char() kwargs = fields.Char() widget = fields.Selection( [ ("integer", "Integer"), ("number", "Number"), ("meter", "Meter"), ("counter", "Counter"), ("graph", "Graph"), ], required=True, default="number", ) value_last_update = fields.Datetime(readonly=True) prefix = fields.Char() suffix = fields.Char() action_ids = fields.One2many( "kpi.kpi.action", inverse_name="kpi_id", help="Actions that can be opened from the KPI", ) code = fields.Text("Code") store_history = fields.Boolean() store_history_interval = fields.Selection( selection=lambda self: self.env["ir.cron"]._fields["interval_type"].selection, ) store_history_interval_number = fields.Integer() compute_on_fly = fields.Boolean() history_ids = fields.One2many("kpi.kpi.history", inverse_name="kpi_id") computed_value = fields.Serialized(compute="_compute_computed_value") computed_date = fields.Datetime(compute="_compute_computed_value") @api.depends("value", "value_last_update", "compute_on_fly") def _compute_computed_value(self): for record in self: if record.compute_on_fly: record.computed_value = record._compute_value() record.computed_date = fields.Datetime.now() else: record.computed_value = record.value record.computed_date = record.value_last_update def _cron_vals(self): return { "name": self.name, "model_id": self.env.ref("kpi_dashboard.model_kpi_kpi").id, "interval_number": 1, "interval_type": "hours", "state": "code", "code": "model.browse(%s).compute()" % self.id, "active": True, } def compute(self): for record in self: record._compute() return True def _generate_history_vals(self, value): return { "kpi_id": self.id, "value": value, "widget": self.widget, } def _compute_value(self): return getattr(self, "_compute_value_%s" % self.computation_method)() def _compute(self): value = self._compute_value() self.write({"value": value}) if self.store_history: last = self.env["kpi.kpi.history"].search( [("kpi_id", "=", self.id)], limit=1 ) if ( not last or not self.store_history_interval or last.create_date + _intervalTypes[self.store_history_interval]( self.store_history_interval_number ) < fields.Datetime.now() ): self.env["kpi.kpi.history"].create(self._generate_history_vals(value)) notifications = [] for dashboard_item in self.dashboard_item_ids: channel = "kpi_dashboard_%s" % dashboard_item.dashboard_id.id notifications.append([channel, dashboard_item._read_dashboard()]) if notifications: self.env["bus.bus"].sendmany(notifications) def _compute_value_function(self): obj = self if self.model_id: obj = self.env[self.model_id.model] args = ast.literal_eval(self.args or "[]") kwargs = ast.literal_eval(self.kwargs or "{}") return getattr(obj, self.function)(*args, **kwargs) def generate_cron(self): self.ensure_one() self.cron_id = self.env["ir.cron"].create(self._cron_vals()) def write(self, vals): if "value" in vals: vals["value_last_update"] = fields.Datetime.now() return super().write(vals) def _get_code_input_dict(self): return { "self": self, "model": self.browse(), "datetime": datetime, "date": date, "time": time, "float_compare": float_compare, "relativedelta": relativedelta.relativedelta, } def _forbidden_code(self): return ["commit", "rollback", "getattr", "execute"] def _compute_value_code(self): forbidden = self._forbidden_code() search_terms = "(" + ("|".join(forbidden)) + ")" if re.search(search_terms, (self.code or "").lower()): message = ", ".join(forbidden[:-1]) or "" if len(message) > 0: message += _(" or ") message += forbidden[-1] raise ValidationError( _("The code cannot contain the following terms: %s.") % message ) results = self._get_code_input_dict() savepoint = "kpi_formula_%s" % self.id # pylint: disable=E8103 self.env.cr.execute("savepoint %s" % savepoint) safe_eval(self.code or "", results, mode="exec", nocopy=True) # pylint: disable=E8103 self.env.cr.execute("rollback to %s" % savepoint) return results.get("result", {}) def show_value(self): self.ensure_one() action = self.env.ref("kpi_dashboard.kpi_kpi_act_window") result = action.read()[0] result.update( { "res_id": self.id, "target": "new", "view_mode": "form", "views": [ (self.env.ref("kpi_dashboard.kpi_kpi_widget_form_view").id, "form") ], } ) return result class KpiKpiAction(models.Model): _name = "kpi.kpi.action" _description = "KPI action" kpi_id = fields.Many2one("kpi.kpi", required=True, ondelete="cascade") action = fields.Reference( selection=[ ("ir.actions.report", "ir.actions.report"), ("ir.actions.act_window", "ir.actions.act_window"), ("ir.actions.act_url", "ir.actions.act_url"), ("ir.actions.server", "ir.actions.server"), ("ir.actions.client", "ir.actions.client"), ], required=True, ) context = fields.Char() def read_dashboard(self): result = {} for r in self: result[r.id] = { "id": r.action.id, "type": r.action._name, "name": r.action.name, "context": safe_eval(r.context or "{}"), } return result class KpiKpiHistory(models.Model): _name = "kpi.kpi.history" _description = "KPI history" _order = "create_date DESC" kpi_id = fields.Many2one( "kpi.kpi", required=True, ondelete="cascade", readonly=True ) value = fields.Serialized(readonly=True) raw_value = fields.Char(compute="_compute_raw_value") name = fields.Char(related="kpi_id.name") widget = fields.Selection( selection=lambda self: self.env["kpi.kpi"]._fields["widget"].selection, required=True, default="number", ) @api.depends("value") def _compute_raw_value(self): for record in self: record.raw_value = json.dumps(record.value) def show_form(self): self.ensure_one() action = self.env.ref("kpi_dashboard.kpi_kpi_history_act_window") result = action.read()[0] result.update( { "res_id": self.id, "target": "new", "view_mode": "form", "views": [(self.env.context.get("form_id"), "form")], } ) return result