You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
180 lines
6.8 KiB
180 lines
6.8 KiB
# -*- coding: utf-8 -*-
|
|
##############################################################################
|
|
#
|
|
# This module copyright (C) 2015 Therp BV (<http://therp.nl>).
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
##############################################################################
|
|
from openerp import _, models, fields, api, exceptions
|
|
import logging
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import ldap
|
|
import ldap.modlist
|
|
except ImportError:
|
|
_logger.debug('Can not `from ldap.filter import filter_format`.')
|
|
|
|
|
|
class ResUsers(models.Model):
|
|
_inherit = 'res.users'
|
|
|
|
ldap_entry_dn = fields.Char('LDAP DN', readonly=True)
|
|
is_ldap_user = fields.Boolean(
|
|
'LDAP user', compute='_compute_is_ldap_user', default=True)
|
|
|
|
@api.model
|
|
@api.returns('self', lambda record: record.id)
|
|
def create(self, values):
|
|
result = super(ResUsers, self).create(values)
|
|
result.push_to_ldap(values)
|
|
return result
|
|
|
|
@api.multi
|
|
def write(self, values):
|
|
result = super(ResUsers, self).write(values)
|
|
self.push_to_ldap(values)
|
|
return result
|
|
|
|
@api.multi
|
|
def _push_to_ldap_possible(self, values):
|
|
return bool(self._get_ldap_configuration())
|
|
|
|
@api.multi
|
|
def _get_ldap_configuration(self):
|
|
self.ensure_one()
|
|
return self.sudo().company_id.ldaps.filtered('create_ldap_entry')[:1]
|
|
|
|
@api.multi
|
|
def _get_ldap_values(self, values):
|
|
self.ensure_one()
|
|
conf = self._get_ldap_configuration()
|
|
result = {}
|
|
for mapping in conf.create_ldap_entry_field_mappings:
|
|
field_name = mapping.field_id.name
|
|
if field_name not in values or not values[field_name]:
|
|
continue
|
|
result[mapping.attribute] = [str(values[field_name])]
|
|
if result:
|
|
result['objectClass'] = conf.create_ldap_entry_objectclass\
|
|
.encode('utf-8').split(',')
|
|
return result
|
|
|
|
@api.multi
|
|
def _get_ldap_dn(self, values):
|
|
self.ensure_one()
|
|
conf = self._get_ldap_configuration()
|
|
dn = conf.create_ldap_entry_field_mappings.filtered('use_for_dn')
|
|
assert dn, 'No DN attribute mapping given!'
|
|
assert self[dn.field_id.name], 'DN attribute empty!'
|
|
return '%s=%s,%s' % (
|
|
dn.attribute,
|
|
ldap.dn.escape_dn_chars(self[dn.field_id.name].encode('utf-8')),
|
|
conf.create_ldap_entry_base or conf.ldap_base)
|
|
|
|
@api.multi
|
|
def push_to_ldap(self, values):
|
|
for this in self:
|
|
if not values.get('is_ldap_user') and not this.is_ldap_user:
|
|
continue
|
|
if not this._push_to_ldap_possible(values):
|
|
continue
|
|
ldap_values = this._get_ldap_values(values)
|
|
if not ldap_values:
|
|
continue
|
|
ldap_configuration = this._get_ldap_configuration()
|
|
ldap_connection = ldap_configuration.connect(
|
|
ldap_configuration.read()[0])
|
|
ldap_connection.simple_bind_s(
|
|
(ldap_configuration.ldap_binddn or '').encode('utf-8'),
|
|
(ldap_configuration.ldap_password or '').encode('utf-8'))
|
|
|
|
try:
|
|
if not this.ldap_entry_dn:
|
|
this._push_to_ldap_create(
|
|
ldap_connection, ldap_configuration, values,
|
|
ldap_values)
|
|
if this.ldap_entry_dn:
|
|
this._push_to_ldap_write(
|
|
ldap_connection, ldap_configuration, values,
|
|
ldap_values)
|
|
except ldap.LDAPError as e:
|
|
_logger.exception(e)
|
|
raise exceptions.Warning(_('Error'), e.message)
|
|
finally:
|
|
ldap_connection.unbind_s()
|
|
|
|
@api.multi
|
|
def _push_to_ldap_create(self, ldap_connection, ldap_configuration, values,
|
|
ldap_values):
|
|
self.ensure_one()
|
|
dn = self._get_ldap_dn(values)
|
|
ldap_connection.add_s(
|
|
dn,
|
|
ldap.modlist.addModlist(ldap_values))
|
|
self.write({'ldap_entry_dn': dn})
|
|
|
|
@api.multi
|
|
def _push_to_ldap_write(self, ldap_connection, ldap_configuration, values,
|
|
ldap_values):
|
|
self.ensure_one()
|
|
dn = self.ldap_entry_dn.encode('utf-8')
|
|
dn_mapping = ldap_configuration.create_ldap_entry_field_mappings\
|
|
.filtered('use_for_dn')
|
|
if dn_mapping.attribute in ldap_values:
|
|
ldap_values.pop(dn_mapping.attribute)
|
|
ldap_entry = ldap_connection.search_s(
|
|
dn, ldap.SCOPE_BASE, '(objectClass=*)',
|
|
map(lambda x: x.encode('utf-8'), ldap_values.keys()))
|
|
assert ldap_entry, '%s not found!' % self.ldap_entry_dn
|
|
ldap_entry = ldap_entry[0][1]
|
|
ldap_connection.modify_s(
|
|
dn,
|
|
ldap.modlist.modifyModlist(ldap_entry, ldap_values))
|
|
|
|
@api.one
|
|
@api.depends('ldap_entry_dn')
|
|
def _compute_is_ldap_user(self):
|
|
self.is_ldap_user = bool(self.ldap_entry_dn)
|
|
|
|
@api.one
|
|
def _change_ldap_password(self, new_passwd, auth_dn=None,
|
|
auth_passwd=None):
|
|
ldap_configuration = self.env.user.sudo()._get_ldap_configuration()
|
|
ldap_connection = ldap_configuration.connect(
|
|
ldap_configuration.read()[0])
|
|
dn = auth_dn or ldap_configuration.ldap_binddn
|
|
old_passwd = auth_passwd or ldap_configuration.ldap_password
|
|
ldap_connection.simple_bind_s(
|
|
dn.encode('utf-8'), old_passwd.encode('utf-8'))
|
|
self.env['ir.model.access'].check('res.users', 'write')
|
|
self.env.user.check_access_rule('write')
|
|
try:
|
|
ldap_connection.passwd_s(
|
|
self.ldap_entry_dn, None, new_passwd.encode('utf-8'))
|
|
except ldap.LDAPError, e:
|
|
raise exceptions.Warning(_('Error'), e.message)
|
|
finally:
|
|
ldap_connection.unbind_s()
|
|
return True
|
|
|
|
@api.model
|
|
def change_password(self, old_passwd, new_passwd):
|
|
if self.env.user.is_ldap_user:
|
|
return self.env.user._change_ldap_password(
|
|
new_passwd, auth_dn=self.env.user.ldap_entry_dn,
|
|
auth_passwd=old_passwd)
|
|
return super(ResUsers, self).change_password(old_passwd, new_passwd)
|