You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
486 lines
18 KiB
486 lines
18 KiB
# -*- coding: utf-8 -*-
|
|
# Copyright 2014-2017 Therp BV <http://therp.nl>
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
|
# pylint: disable=method-required-super
|
|
"""Abstract model to show each relation from two sides."""
|
|
import collections
|
|
import logging
|
|
|
|
from psycopg2.extensions import AsIs
|
|
|
|
from openerp import _, api, fields, models
|
|
from openerp.exceptions import ValidationError
|
|
from openerp.tools import drop_view_if_exists
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
_last_key_offset = -1
|
|
_specification_register = collections.OrderedDict()
|
|
|
|
|
|
def register_select_specification(base_name, is_inverse, select_sql):
|
|
"""Register SELECT clause for rows to be included in view.
|
|
|
|
Each SELECT clause must contain a first column in the form:
|
|
'<base_keyfield> * %%(padding)s + %(key_offset),'
|
|
The %%(padding)s will be used as a parameter for the
|
|
cursor.execute function, the %(key_offset) will be replaced
|
|
by dictionary replacement.
|
|
The columns for each SELECT clause must be ordered as follows:
|
|
id - specified as defined above:
|
|
res_model: model._name of the underlying table,
|
|
res_id: base key in the underlying table,
|
|
this_partner_id: must refer to a partner
|
|
other_partner_id: must refer to a related partner
|
|
type_id: refers to res.partner.relation.type,
|
|
date_start: start date of relation if relevant or NULL,
|
|
date_end: end date of relation if relevant or NULL,
|
|
%(is_inverse)s as is_inverse: used to determine type_selection_id,
|
|
"""
|
|
global _last_key_offset
|
|
key_name = base_name + (is_inverse and '_inverse' or '')
|
|
assert key_name not in _specification_register
|
|
assert '%%(padding)s' in select_sql
|
|
assert '%(key_offset)s' in select_sql
|
|
assert '%(is_inverse)s' in select_sql
|
|
_last_key_offset += 1
|
|
_specification_register[key_name] = dict(
|
|
base_name=base_name,
|
|
is_inverse=is_inverse,
|
|
key_offset=_last_key_offset,
|
|
select_sql=select_sql % {
|
|
'key_offset': _last_key_offset,
|
|
'is_inverse': is_inverse})
|
|
|
|
|
|
def get_select_specification(base_name, is_inverse):
|
|
key_name = base_name + (is_inverse and '_inverse' or '')
|
|
return _specification_register[key_name]
|
|
|
|
|
|
# Register relations
|
|
register_select_specification(
|
|
base_name='relation',
|
|
is_inverse=False,
|
|
select_sql="""\
|
|
SELECT
|
|
(rel.id * %%(padding)s) + %(key_offset)s AS id,
|
|
'res.partner.relation' AS res_model,
|
|
rel.id AS res_id,
|
|
rel.left_partner_id AS this_partner_id,
|
|
rel.right_partner_id AS other_partner_id,
|
|
rel.type_id,
|
|
rel.date_start,
|
|
rel.date_end,
|
|
%(is_inverse)s as is_inverse
|
|
FROM res_partner_relation rel
|
|
""")
|
|
|
|
# Register inverse relations
|
|
register_select_specification(
|
|
base_name='relation',
|
|
is_inverse=True,
|
|
select_sql="""\
|
|
SELECT
|
|
(rel.id * %%(padding)s) + %(key_offset)s AS id,
|
|
'res.partner.relation',
|
|
rel.id,
|
|
rel.right_partner_id,
|
|
rel.left_partner_id,
|
|
rel.type_id,
|
|
rel.date_start,
|
|
rel.date_end,
|
|
%(is_inverse)s as is_inverse
|
|
FROM res_partner_relation rel
|
|
""")
|
|
|
|
|
|
class ResPartnerRelationAll(models.AbstractModel):
|
|
"""Abstract model to show each relation from two sides."""
|
|
_auto = False
|
|
_log_access = False
|
|
_name = 'res.partner.relation.all'
|
|
_description = 'All (non-inverse + inverse) relations between partners'
|
|
_order = \
|
|
'this_partner_id, type_selection_id, date_end desc, date_start desc'
|
|
|
|
res_model = fields.Char(
|
|
string='Resource Model',
|
|
readonly=True,
|
|
required=True,
|
|
help="The database object this relation is based on.")
|
|
res_id = fields.Integer(
|
|
string='Resource ID',
|
|
readonly=True,
|
|
required=True,
|
|
help="The id of the object in the model this relation is based on.")
|
|
this_partner_id = fields.Many2one(
|
|
comodel_name='res.partner',
|
|
string='One Partner',
|
|
required=True)
|
|
other_partner_id = fields.Many2one(
|
|
comodel_name='res.partner',
|
|
string='Other Partner',
|
|
required=True)
|
|
type_id = fields.Many2one(
|
|
comodel_name='res.partner.relation.type',
|
|
string='Underlying Relation Type',
|
|
readonly=True,
|
|
required=True)
|
|
date_start = fields.Date('Starting date')
|
|
date_end = fields.Date('Ending date')
|
|
is_inverse = fields.Boolean(
|
|
string="Is reverse type?",
|
|
readonly=True,
|
|
help="Inverse relations are from right to left partner.")
|
|
type_selection_id = fields.Many2one(
|
|
comodel_name='res.partner.relation.type.selection',
|
|
string='Relation Type',
|
|
required=True)
|
|
active = fields.Boolean(
|
|
string='Active',
|
|
readonly=True,
|
|
help="Records with date_end in the past are inactive")
|
|
any_partner_id = fields.Many2many(
|
|
comodel_name='res.partner',
|
|
string='Partner',
|
|
compute=lambda self: None,
|
|
search='_search_any_partner_id')
|
|
|
|
def _get_active_selects(self):
|
|
"""Return selects actually to be used.
|
|
|
|
Selects are registered from all modules PRESENT. But should only be
|
|
used to build view if module actually INSTALLED.
|
|
"""
|
|
return ['relation', 'relation_inverse']
|
|
|
|
def _get_statement(self):
|
|
"""Allow other modules to add to statement."""
|
|
active_selects = self._get_active_selects()
|
|
union_select = ' UNION '.join(
|
|
[_specification_register[key]['select_sql']
|
|
for key in active_selects])
|
|
return """\
|
|
CREATE OR REPLACE VIEW %%(table)s AS
|
|
WITH base_selection AS (%(union_select)s)
|
|
SELECT
|
|
bas.*,
|
|
CASE
|
|
WHEN NOT bas.is_inverse OR typ.is_symmetric
|
|
THEN bas.type_id * 2
|
|
ELSE (bas.type_id * 2) + 1
|
|
END as type_selection_id,
|
|
(bas.date_end IS NULL OR bas.date_end >= current_date) AS active
|
|
%%(additional_view_fields)s
|
|
FROM base_selection bas
|
|
JOIN res_partner_relation_type typ ON (bas.type_id = typ.id)
|
|
%%(additional_tables)s
|
|
""" % {'union_select': union_select}
|
|
|
|
def _get_padding(self):
|
|
"""Utility function to define padding in one place."""
|
|
return 100
|
|
|
|
def _get_additional_view_fields(self):
|
|
"""Allow inherit models to add fields to view.
|
|
|
|
If fields are added, the resulting string must have each field
|
|
prepended by a comma, like so:
|
|
return ', typ.allow_self, typ.left_partner_category'
|
|
"""
|
|
return ''
|
|
|
|
def _get_additional_tables(self):
|
|
"""Allow inherit models to add tables (JOIN's) to view.
|
|
|
|
Example:
|
|
return 'JOIN type_extention ext ON (bas.type_id = ext.id)'
|
|
"""
|
|
return ''
|
|
|
|
@api.model_cr_context
|
|
def _auto_init(self):
|
|
cr = self._cr
|
|
drop_view_if_exists(cr, self._table)
|
|
cr.execute(
|
|
self._get_statement(),
|
|
{'table': AsIs(self._table),
|
|
'padding': self._get_padding(),
|
|
'additional_view_fields':
|
|
AsIs(self._get_additional_view_fields()),
|
|
'additional_tables':
|
|
AsIs(self._get_additional_tables())})
|
|
return super(ResPartnerRelationAll, self)._auto_init()
|
|
|
|
@api.model
|
|
def _search_any_partner_id(self, operator, value):
|
|
"""Search relation with partner, no matter on which side."""
|
|
# pylint: disable=no-self-use
|
|
return [
|
|
'|',
|
|
('this_partner_id', operator, value),
|
|
('other_partner_id', operator, value)]
|
|
|
|
@api.multi
|
|
def name_get(self):
|
|
return {
|
|
this.id: '%s %s %s' % (
|
|
this.this_partner_id.name,
|
|
this.type_selection_id.display_name,
|
|
this.other_partner_id.name,
|
|
) for this in self}
|
|
|
|
@api.onchange('type_selection_id')
|
|
def onchange_type_selection_id(self):
|
|
"""Add domain on partners according to category and contact_type."""
|
|
|
|
def check_partner_domain(partner, partner_domain, side):
|
|
"""Check wether partner_domain results in empty selection
|
|
for partner, or wrong selection of partner already selected.
|
|
"""
|
|
warning = {}
|
|
if partner:
|
|
test_domain = [('id', '=', partner.id)] + partner_domain
|
|
else:
|
|
test_domain = partner_domain
|
|
partner_model = self.env['res.partner']
|
|
partners_found = partner_model.search(test_domain, limit=1)
|
|
if not partners_found:
|
|
warning['title'] = _('Error!')
|
|
if partner:
|
|
warning['message'] = (
|
|
_('%s partner incompatible with relation type.') %
|
|
side.title())
|
|
else:
|
|
warning['message'] = (
|
|
_('No %s partner available for relation type.') %
|
|
side)
|
|
return warning
|
|
|
|
this_partner_domain = []
|
|
other_partner_domain = []
|
|
if self.type_selection_id.contact_type_this:
|
|
this_partner_domain.append((
|
|
'is_company', '=',
|
|
self.type_selection_id.contact_type_this == 'c'))
|
|
if self.type_selection_id.partner_category_this:
|
|
this_partner_domain.append((
|
|
'category_id', 'in',
|
|
self.type_selection_id.partner_category_this.ids))
|
|
if self.type_selection_id.contact_type_other:
|
|
other_partner_domain.append((
|
|
'is_company', '=',
|
|
self.type_selection_id.contact_type_other == 'c'))
|
|
if self.type_selection_id.partner_category_other:
|
|
other_partner_domain.append((
|
|
'category_id', 'in',
|
|
self.type_selection_id.partner_category_other.ids))
|
|
result = {'domain': {
|
|
'this_partner_id': this_partner_domain,
|
|
'other_partner_id': other_partner_domain}}
|
|
# Check wether domain results in no choice or wrong choice of partners:
|
|
warning = {}
|
|
partner_model = self.env['res.partner']
|
|
if this_partner_domain:
|
|
this_partner = False
|
|
if bool(self.this_partner_id.id):
|
|
this_partner = self.this_partner_id
|
|
else:
|
|
this_partner_id = \
|
|
'default_this_partner_id' in self.env.context and \
|
|
self.env.context['default_this_partner_id'] or \
|
|
'active_id' in self.env.context and \
|
|
self.env.context['active_id'] or \
|
|
False
|
|
if this_partner_id:
|
|
this_partner = partner_model.browse(this_partner_id)
|
|
warning = check_partner_domain(
|
|
this_partner, this_partner_domain, _('this'))
|
|
if not warning and other_partner_domain:
|
|
warning = check_partner_domain(
|
|
self.other_partner_id, other_partner_domain, _('other'))
|
|
if warning:
|
|
result['warning'] = warning
|
|
return result
|
|
|
|
@api.onchange(
|
|
'this_partner_id',
|
|
'other_partner_id')
|
|
def onchange_partner_id(self):
|
|
"""Set domain on type_selection_id based on partner(s) selected."""
|
|
|
|
def check_type_selection_domain(type_selection_domain):
|
|
"""If type_selection_id already selected, check wether it
|
|
is compatible with the computed type_selection_domain. An empty
|
|
selection can practically only occur in a practically empty
|
|
database, and will not lead to problems. Therefore not tested.
|
|
"""
|
|
warning = {}
|
|
if not (type_selection_domain and self.type_selection_id):
|
|
return warning
|
|
test_domain = (
|
|
[('id', '=', self.type_selection_id.id)] +
|
|
type_selection_domain)
|
|
type_model = self.env['res.partner.relation.type.selection']
|
|
types_found = type_model.search(test_domain, limit=1)
|
|
if not types_found:
|
|
warning['title'] = _('Error!')
|
|
warning['message'] = _(
|
|
'Relation type incompatible with selected partner(s).')
|
|
return warning
|
|
|
|
type_selection_domain = []
|
|
if self.this_partner_id:
|
|
type_selection_domain += [
|
|
'|',
|
|
('contact_type_this', '=', False),
|
|
('contact_type_this', '=',
|
|
self.this_partner_id.get_partner_type()),
|
|
'|',
|
|
('partner_category_this', '=', False),
|
|
('partner_category_this', 'in',
|
|
self.this_partner_id.category_id.ids)]
|
|
if self.other_partner_id:
|
|
type_selection_domain += [
|
|
'|',
|
|
('contact_type_other', '=', False),
|
|
('contact_type_other', '=',
|
|
self.other_partner_id.get_partner_type()),
|
|
'|',
|
|
('partner_category_other', '=', False),
|
|
('partner_category_other', 'in',
|
|
self.other_partner_id.category_id.ids)]
|
|
result = {'domain': {
|
|
'type_selection_id': type_selection_domain}}
|
|
# Check wether domain results in no choice or wrong choice for
|
|
# type_selection_id:
|
|
warning = check_type_selection_domain(type_selection_domain)
|
|
if warning:
|
|
result['warning'] = warning
|
|
return result
|
|
|
|
@api.model
|
|
def _correct_vals(self, vals, type_selection):
|
|
"""Fill left and right partner from this and other partner."""
|
|
vals = vals.copy()
|
|
if 'type_selection_id' in vals:
|
|
vals['type_id'] = type_selection.type_id.id
|
|
if type_selection.is_inverse:
|
|
if 'this_partner_id' in vals:
|
|
vals['right_partner_id'] = vals['this_partner_id']
|
|
if 'other_partner_id' in vals:
|
|
vals['left_partner_id'] = vals['other_partner_id']
|
|
else:
|
|
if 'this_partner_id' in vals:
|
|
vals['left_partner_id'] = vals['this_partner_id']
|
|
if 'other_partner_id' in vals:
|
|
vals['right_partner_id'] = vals['other_partner_id']
|
|
# Delete values not in underlying table:
|
|
for key in (
|
|
'this_partner_id',
|
|
'type_selection_id',
|
|
'other_partner_id',
|
|
'is_inverse'):
|
|
if key in vals:
|
|
del vals[key]
|
|
return vals
|
|
|
|
@api.multi
|
|
def get_base_resource(self):
|
|
"""Get base resource from res_model and res_id."""
|
|
self.ensure_one()
|
|
base_model = self.env[self.res_model]
|
|
return base_model.browse([self.res_id])
|
|
|
|
@api.multi
|
|
def write_resource(self, base_resource, vals):
|
|
"""write handled by base resource."""
|
|
self.ensure_one()
|
|
# write for models other then res.partner.relation SHOULD
|
|
# be handled in inherited models:
|
|
relation_model = self.env['res.partner.relation']
|
|
assert self.res_model == relation_model._name
|
|
base_resource.write(vals)
|
|
|
|
@api.model
|
|
def _get_type_selection_from_vals(self, vals):
|
|
"""Get type_selection_id straight from vals or compute from type_id.
|
|
"""
|
|
type_selection_id = vals.get('type_selection_id', False)
|
|
if not type_selection_id:
|
|
type_id = vals.get('type_id', False)
|
|
if type_id:
|
|
is_inverse = vals.get('is_inverse')
|
|
type_selection_id = type_id * 2 + (is_inverse and 1 or 0)
|
|
return type_selection_id and self.type_selection_id.browse(
|
|
type_selection_id) or False
|
|
|
|
@api.multi
|
|
def write(self, vals):
|
|
"""For model 'res.partner.relation' call write on underlying model.
|
|
"""
|
|
new_type_selection = self._get_type_selection_from_vals(vals)
|
|
for rec in self:
|
|
type_selection = new_type_selection or rec.type_selection_id
|
|
vals = rec._correct_vals(vals, type_selection)
|
|
base_resource = rec.get_base_resource()
|
|
rec.write_resource(base_resource, vals)
|
|
# Invalidate cache to make res.partner.relation.all reflect changes
|
|
# in underlying res.partner.relation:
|
|
self.env.invalidate_all()
|
|
return True
|
|
|
|
@api.model
|
|
def _compute_base_name(self, type_selection):
|
|
"""This will be overridden for each inherit model."""
|
|
return 'relation'
|
|
|
|
@api.model
|
|
def _compute_id(self, base_resource, type_selection):
|
|
"""Compute id. Allow for enhancements in inherit model."""
|
|
base_name = self._compute_base_name(type_selection)
|
|
key_offset = get_select_specification(
|
|
base_name, type_selection.is_inverse)['key_offset']
|
|
return base_resource.id * self._get_padding() + key_offset
|
|
|
|
@api.model
|
|
def create_resource(self, vals, type_selection):
|
|
relation_model = self.env['res.partner.relation']
|
|
return relation_model.create(vals)
|
|
|
|
@api.model
|
|
def create(self, vals):
|
|
"""Divert non-problematic creates to underlying table.
|
|
|
|
Create a res.partner.relation but return the converted id.
|
|
"""
|
|
type_selection = self._get_type_selection_from_vals(vals)
|
|
if not type_selection: # Should not happen
|
|
raise ValidationError(
|
|
_('No relation type specified in vals: %s.') % vals)
|
|
vals = self._correct_vals(vals, type_selection)
|
|
base_resource = self.create_resource(vals, type_selection)
|
|
res_id = self._compute_id(base_resource, type_selection)
|
|
return self.browse(res_id)
|
|
|
|
@api.multi
|
|
def unlink_resource(self, base_resource):
|
|
"""Delegate unlink to underlying model."""
|
|
self.ensure_one()
|
|
# unlink for models other then res.partner.relation SHOULD
|
|
# be handled in inherited models:
|
|
relation_model = self.env['res.partner.relation']
|
|
assert self.res_model == relation_model._name
|
|
base_resource.unlink()
|
|
|
|
@api.multi
|
|
def unlink(self):
|
|
"""For model 'res.partner.relation' call unlink on underlying model.
|
|
"""
|
|
for rec in self:
|
|
base_resource = rec.get_base_resource()
|
|
rec.unlink_resource(base_resource)
|
|
return True
|