# -*- coding: utf-8 -*- '''Extend res.partner model''' ############################################################################## # # OpenERP, Open Source Management Solution # This module copyright (C) 2013 Therp BV (). # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # ############################################################################## import time from openerp import osv, models, fields, exceptions, api from openerp.osv.expression import is_leaf, AND, OR, FALSE_LEAF from openerp.tools import DEFAULT_SERVER_DATE_FORMAT from openerp.tools.translate import _ class ResPartner(models.Model): _inherit = 'res.partner' relation_count = fields.Integer( 'Relation Count', compute="_count_relations" ) @api.one @api.depends("relation_ids") def _count_relations(self): """Count the number of relations this partner has for Smart Button Don't count inactive relations. """ self.relation_count = len([r for r in self.relation_ids if r.active]) def _get_relation_ids_select(self, cr, uid, ids, field_name, arg, context=None): '''return the partners' relations as tuple (id, left_partner_id, right_partner_id)''' cr.execute( '''select id, left_partner_id, right_partner_id from res_partner_relation where (left_partner_id in %s or right_partner_id in %s)''' + ' order by ' + self.pool['res.partner.relation']._order, (tuple(ids), tuple(ids)) ) return cr.fetchall() def _get_relation_ids( self, cr, uid, ids, field_name, arg, context=None): '''getter for relation_ids''' if context is None: context = {} result = dict([(i, []) for i in ids]) # TODO: do a permission test on returned ids for row in self._get_relation_ids_select( cr, uid, ids, field_name, arg, context=context): if row[1] in result: result[row[1]].append(row[0]) if row[2] in result: result[row[2]].append(row[0]) return result def _set_relation_ids( self, cr, uid, ids, dummy_name, field_value, dummy_arg, context=None): '''setter for relation_ids''' if context is None: context = {} relation_obj = self.pool.get('res.partner.relation') context2 = self.with_partner_relations_context( cr, uid, ids, context=context).env.context for value in field_value: if value[0] == 0: relation_obj.create(cr, uid, value[2], context=context2) if value[0] == 1: # if we write partner_id_display, we also need to pass # type_selection_id in order to have this write end up on # the correct field if 'partner_id_display' in value[2] and 'type_selection_id'\ not in value[2]: relation_data = relation_obj.read( cr, uid, [value[1]], ['type_selection_id'], context=context)[0] value[2]['type_selection_id'] =\ relation_data['type_selection_id'] relation_obj.write( cr, uid, value[1], value[2], context=context2) if value[0] == 2: relation_obj.unlink(cr, uid, value[1], context=context2) def _search_relation_id( self, cr, uid, dummy_obj, name, args, context=None): result = [] for arg in args: if isinstance(arg, tuple) and arg[0] == name: if arg[1] not in ['=', '!=', 'like', 'not like', 'ilike', 'not ilike', 'in', 'not in']: raise exceptions.ValidationError( _('Unsupported search operand "%s"') % arg[1]) relation_type_selection_ids = [] relation_type_selection = self\ .pool['res.partner.relation.type.selection'] if arg[1] == '=' and isinstance(arg[2], (long, int)): relation_type_selection_ids.append(arg[2]) elif arg[1] == '!=' and isinstance(arg[2], (long, int)): type_id, is_inverse = ( relation_type_selection.browse(cr, uid, arg[2], context=context) .get_type_from_selection_id() ) result = OR([ result, [ ('relation_all_ids.type_id', '!=', type_id), ] ]) continue else: relation_type_selection_ids = relation_type_selection\ .search( cr, uid, [ ('type_id.name', arg[1], arg[2]), ('record_type', '=', 'a'), ], context=context) relation_type_selection_ids.extend( relation_type_selection.search( cr, uid, [ ('type_id.name_inverse', arg[1], arg[2]), ('record_type', '=', 'b'), ], context=context)) if not relation_type_selection_ids: result = AND([result, [FALSE_LEAF]]) for relation_type_selection_id in relation_type_selection_ids: type_id, is_inverse = ( relation_type_selection.browse( cr, uid, relation_type_selection_id, context=context ).get_type_from_selection_id() ) result = OR([ result, [ '&', ('relation_all_ids.type_id', '=', type_id), ('relation_all_ids.record_type', '=', 'b' if is_inverse else 'a') ], ]) return result def _search_relation_date(self, cr, uid, obj, name, args, context=None): result = [] for arg in args: if isinstance(arg, tuple) and arg[0] == name: # TODO: handle {<,>}{,=} if arg[1] != '=': continue result.extend([ '&', '|', ('relation_all_ids.date_start', '=', False), ('relation_all_ids.date_start', '<=', arg[2]), '|', ('relation_all_ids.date_end', '=', False), ('relation_all_ids.date_end', '>=', arg[2]), ]) return result def _search_related_partner_id( self, cr, uid, dummy_obj, name, args, context=None): result = [] for arg in args: if isinstance(arg, tuple) and arg[0] == name: result.append( ( 'relation_all_ids.other_partner_id', arg[1], arg[2], )) return result def _search_related_partner_category_id( self, cr, uid, dummy_obj, name, args, context=None): result = [] for arg in args: if isinstance(arg, tuple) and arg[0] == name: result.append( ( 'relation_all_ids.other_partner_id.category_id', arg[1], arg[2], )) return result _columns = { 'relation_ids': osv.fields.function( lambda self, *args, **kwargs: self._get_relation_ids( *args, **kwargs), fnct_inv=_set_relation_ids, type='one2many', obj='res.partner.relation', string='Relations', selectable=False, ), 'relation_all_ids': osv.fields.one2many( 'res.partner.relation.all', 'this_partner_id', string='All relations with current partner', auto_join=True, selectable=False, ), 'search_relation_id': osv.fields.function( lambda self, cr, uid, ids, *args: dict([ (i, False) for i in ids]), fnct_search=_search_relation_id, string='Has relation of type', type='many2one', obj='res.partner.relation.type.selection' ), 'search_relation_partner_id': osv.fields.function( lambda self, cr, uid, ids, *args: dict([ (i, False) for i in ids]), fnct_search=_search_related_partner_id, string='Has relation with', type='many2one', obj='res.partner' ), 'search_relation_date': osv.fields.function( lambda self, cr, uid, ids, *args: dict([ (i, False) for i in ids]), fnct_search=_search_relation_date, string='Relation valid', type='date' ), 'search_relation_partner_category_id': osv.fields.function( lambda self, cr, uid, ids, *args: dict([ (i, False) for i in ids]), fnct_search=_search_related_partner_category_id, string='Has relation with a partner in category', type='many2one', obj='res.partner.category' ), } def copy_data(self, cr, uid, id, default=None, context=None): if default is None: default = {} default.setdefault('relation_ids', []) default.setdefault('relation_all_ids', []) return super(ResPartner, self).copy_data(cr, uid, id, default=default, context=context) def search(self, cr, uid, args, offset=0, limit=None, order=None, context=None, count=False): if context is None: context = {} # inject searching for current relation date if we search for relation # properties and no explicit date was given date_args = [] for arg in args: if is_leaf(arg) and arg[0].startswith('search_relation'): if arg[0] == 'search_relation_date': date_args = [] break if not date_args: date_args = [ ('search_relation_date', '=', time.strftime( DEFAULT_SERVER_DATE_FORMAT))] # because of auto_join, we have to do the active test by hand active_args = [] if context.get('active_test', True): for arg in args: if is_leaf(arg) and\ arg[0].startswith('search_relation'): active_args = [('relation_all_ids.active', '=', True)] break return super(ResPartner, self).search( cr, uid, args + date_args + active_args, offset=offset, limit=limit, order=order, context=context, count=count) @api.multi def read(self, fields=None, load='_classic_read'): return super(ResPartner, self.with_partner_relations_context())\ .read(fields=fields, load=load) @api.multi def write(self, vals): return super(ResPartner, self.with_partner_relations_context())\ .write(vals) @api.multi def with_partner_relations_context(self): context = dict(self.env.context) if context.get('active_model', self._name) == self._name: existing = self.exists() context.setdefault( 'active_id', existing.ids[0] if existing.ids else None) context.setdefault('active_ids', existing.ids) context.setdefault('active_model', self._name) return self.with_context(context)