# Copyright 2015-2019 Onestein () # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). from collections import defaultdict from odoo import api, models, registry NO_BI_MODELS = [ 'fetchmail.server' ] NO_BI_TTYPES = [ 'many2many', 'one2many', 'html', 'binary', 'reference' ] def dict_for_field(field): return { 'id': field.id, 'name': field.name, 'description': field.field_description, 'type': field.ttype, 'relation': field.relation, 'custom': False, 'model_id': field.model_id.id, 'model': field.model_id.model, 'model_name': field.model_id.name } def dict_for_model(model): return { 'id': model.id, 'name': model.name, 'model': model.model } class IrModel(models.Model): _inherit = 'ir.model' @api.model def _filter_bi_models(self, model): def _check_name(model_model): if model_model in NO_BI_MODELS: return 1 return 0 def _check_startswith(model_model): if model_model.startswith('workflow') or \ model_model.startswith('ir.') or \ model_model.startswith('base_'): return 1 return 0 def _check_contains(model_model): if 'mail' in model_model or \ 'report' in model_model or \ 'edi.' in model_model: return 1 return 0 def _check_unknown(model_name): if model_name == 'Unknown' or '.' in model_name: return 1 return 0 model_model = model['model'] model_name = model['name'] count_check = 0 count_check += _check_name(model_model) count_check += _check_startswith(model_model) count_check += _check_contains(model_model) count_check += _check_unknown(model_name) if not count_check: return self.env['ir.model.access'].check( model['model'], 'read', False) return False def get_model_list(self, model_table_map): if not model_table_map: return [] domain = [('model_id', 'in', list(model_table_map.keys())), ('store', '=', True), ('ttype', '=', 'many2one')] fields = self.env['ir.model.fields'].sudo().search(domain) model_list = [] for field in fields: for table_alias in model_table_map[field.model_id.id]: model_list.append(dict( dict_for_field(field), table_alias=table_alias, join_node=-1, )) return model_list def get_relation_list(self, model_table_map): if not model_table_map: return [] model_names = {} for model in self.sudo().browse(model_table_map.keys()): model_names.update({model.model: model.id}) domain = [('relation', 'in', list(model_names.keys())), ('store', '=', True), ('ttype', '=', 'many2one')] fields = self.env['ir.model.fields'].sudo().search(domain) relation_list = [] for field in fields: model_id = model_names[field.relation] for join_node in model_table_map[model_id]: relation_list.append(dict( dict_for_field(field), join_node=join_node, table_alias=-1 )) return relation_list @api.model def _get_related_models_domain(self, model_table_map): domain = [('transient', '=', False)] if model_table_map: model_list = self.get_model_list(model_table_map) relation_list = self.get_relation_list(model_table_map) model_ids = [f['model_id'] for f in relation_list + model_list] model_ids += list(model_table_map.keys()) relations = [f['relation'] for f in model_list] domain += [ '|', ('id', 'in', model_ids), ('model', 'in', relations)] return domain @api.model def get_related_models(self, model_table_map): """ Return list of model dicts for all models that can be joined with the already selected models. """ domain = self._get_related_models_domain(model_table_map) return self.sudo().search(domain, order='name asc') @api.model def get_models(self, table_model_map=None): """ Return list of model dicts for all available models. """ self = self.with_context(lang=self.env.user.lang) model_table_map = defaultdict(list) for k, v in (table_model_map or {}).items(): model_table_map[v].append(k) models = self.get_related_models(model_table_map) # filter out abstract models (they do not have DB tables) non_abstract_models = self.env.registry.models.keys() models = models.filtered(lambda m: m.model in non_abstract_models) return list(map(dict_for_model, models)) @api.model def get_join_nodes(self, field_data, new_field): """ Return list of field dicts of join nodes Return all possible join nodes to add new_field to the query containing model_ids. """ def remove_duplicate_nodes(join_nodes): seen = set() nodes_list = [] for node in join_nodes: node_tuple = tuple(node.items()) if node_tuple not in seen: seen.add(node_tuple) nodes_list.append(node) return nodes_list self = self.with_context(lang=self.env.user.lang) keys = [] model_table_map = defaultdict(list) for field in field_data: model_table_map[field['model_id']].append(field['table_alias']) if field.get('join_node', -1) != -1: keys.append((field['table_alias'], field['id'])) # nodes in current model existing_aliases = model_table_map[new_field['model_id']] join_nodes = [{'table_alias': alias} for alias in existing_aliases] # nodes in past selected models for field in self.get_model_list(model_table_map): if new_field['model'] == field['relation']: if (field['table_alias'], field['id']) not in keys: join_nodes.append(field) # nodes in new model for field in self.get_relation_list(model_table_map): if new_field['model_id'] == field['model_id']: if (field['table_alias'], field['id']) not in keys: join_nodes.append(field) return remove_duplicate_nodes(join_nodes) @api.model def get_fields(self, model_id): self = self.with_context(lang=self.env.user.lang) fields = self.env['ir.model.fields'].sudo().search([ ('model_id', '=', model_id), ('store', '=', True), ('name', 'not in', models.MAGIC_COLUMNS), ('ttype', 'not in', NO_BI_TTYPES) ], order='field_description desc') fields_dict = list(map(dict_for_field, fields)) return fields_dict @api.model def create(self, vals): if self.env.context and self.env.context.get('bve'): vals['state'] = 'base' res = super().create(vals) # this sql update is necessary since a write method here would # be not working (an orm constraint is restricting the modification # of the state field while updating ir.model) q = "UPDATE ir_model SET state = 'manual' WHERE id = %s" self.env.cr.execute(q, (res.id, )) # # update registry if self.env.context.get('bve'): # setup models; this reloads custom models in registry self.pool.setup_models(self._cr) # signal that registry has changed registry(self.env.cr.dbname).signal_changes() return res