|
|
# -*- coding: utf-8 -*- ############################################################################## # # Copyright (c) 2010-2011 Camptocamp SA (http://www.camptocamp.com) # All Right Reserved # # Author : Nicolas Bessi (Camptocamp), Thanks to Laurent Lauden for his code adaptation # Active directory Donor: M. Benadiba (Informatique Assistances.fr) # Contribution : Joel Grand-Guillaume # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ##############################################################################
# TODO Find why company parameter are cached
import re import unicodedata import netsvc try: import ldap import ldap.modlist except ImportError: print('python ldap not installed please install it in order to use this module')
from openerp.osv import orm, fields from tools.translate import _ logger = netsvc.Logger()
class LdapConnMApper(object): """LdapConnMApper: push specific fields from the Terp Partner_contacts to the
LDAP schema inetOrgPerson. Ldap bind options are stored in company. """
def __init__(self, cursor, uid, osv_obj, context=None): """Initialize connexion to ldap by using parameter set in the current user company""" logger.notifyChannel("MY TOPIC", netsvc.LOG_DEBUG, _('Initalize LDAP CONN')) self.USER_DN = '' self.CONTACT_DN = '' self.LDAP_SERVER = '' self.PASS = '' self.OU = '' self.connexion = '' self.ACTIVDIR = False
# Reading ldap pref user = osv_obj.pool.get('res.users').browse(cursor, uid, uid, context=context) company = osv_obj.pool.get('res.company').browse(cursor, uid, user.company_id.id, context=context) self.USER_DN = company.base_dn self.CONTACT_DN = company.contact_dn self.LDAP_SERVER = company.ldap_server self.PASS = company.passwd self.PORT = company.ldap_port self.OU = company.ounit self.ACTIVDIR = company.is_activedir
mand = (self.USER_DN, self.CONTACT_DN, self.LDAP_SERVER, self.PASS, self.OU) if company.ldap_active: for param in mand: if not param: raise orm.except_orm(_('Warning !'), _('An LDAP parameter is missing for company %s') % (company.name,))
def get_connexion(self): """create a new ldap connexion""" logger.notifyChannel("LDAP Address", netsvc.LOG_DEBUG, _('connecting to server ldap %s') % (self.LDAP_SERVER,)) if self.PORT: self.connexion = ldap.open(self.LDAP_SERVER, self.PORT) else: self.connexion = ldap.open(self.LDAP_SERVER) self.connexion.simple_bind_s(self.USER_DN, self.PASS) return self.connexion
class LDAPAddress(orm.Model): """Override the CRUD of the objet in order to dynamically bind to ldap""" _inherit = 'res.partner.address' ldapMapper = None
def init(self, cr): logger = netsvc.Logger() try: logger.notifyChannel(_('LDAP address init'), netsvc.LOG_INFO, _('try to ALTER TABLE res_partner_address RENAME ' 'column name to lastname ;')) cr.execute('ALTER TABLE res_partner_address RENAME column name to lastname ;')
except Exception: cr.rollback() logger.notifyChannel(_('LDAP address init'), netsvc.LOG_INFO, _('Warning ! impossible to rename column name' ' into lastname, this is probabely aleready' ' done or does not exist'))
def _compute_name(self, firstname, lastname): firstname = firstname or u'' lastname = lastname or u'' firstname = (u' ' + firstname).rstrip() return u"%s%s" % (lastname, firstname)
def _name_get_fnc(self, cursor, uid, ids, name, arg, context=None): """Get the name (lastname + firstname), otherwise ''""" if not ids: return [] reads = self.read(cursor, uid, ids, ['lastname', 'firstname']) res = [] for record in reads: # We want to have " firstname" or "" name = self._compute_name(record['firstname'], record['lastname']) res.append((record['id'], name)) return dict(res)
# TODO get the new version of name search not vulnerable to sql injections # def name_search(self, cursor, user, name, args=None, operator='ilike', context=None, limit=100): # if not context: context = {} # prep_name = '.*%s.*' %(name) # cursor.execute(("select id from res_partner_address where" # " (to_ascii(convert( lastname, 'UTF8', 'LATIN1'),'LATIN-1') ~* '%s'" # " or to_ascii(convert( firstname, 'UTF8', 'LATIN1'),'LATIN-1') ~* '%s')" # " limit %s") % (prep_name, prep_name, limit)) # res = cursor.fetchall() # if res: # res = [x[0] for x in res] # else: # res = [] # # search in partner name to know if we are searching partner... # partner_obj=self.pool.get('res.partner') # part_len = len(res)-limit # if part_len > 0: # partner_res = partner_obj.search(cursor, user, [('name', 'ilike', name)], # limit=part_len, context=context) # for p in partner_res: # addresses = partner_obj.browse(cursor, user, p).address # # Take each contact and add it to # for add in addresses: # res.append(add.id) # return self.name_get(cursor, user, res, context)
_columns = { 'firstname': fields.char('First name', size=256), 'lastname': fields.char('Last name', size=256), 'name': fields.function(_name_get_fnc, method=True, type="char", size=512, store=True, string='Contact Name', help='Name generated from the first name and last name', nodrop=True), 'private_phone': fields.char('Private phone', size=128), }
def create(self, cursor, uid, vals, context=None): self.getconn(cursor, uid, {}) ids = None self.validate_entries(vals, cursor, uid, ids) tmp_id = super(LDAPAddress, self).create(cursor, uid, vals, context) if self.ldaplinkactive(cursor, uid, context): self.saveLdapContact(tmp_id, vals, cursor, uid, context) return tmp_id
def write(self, cursor, uid, ids, vals, context=None): context = context or {} self.getconn(cursor, uid, {}) if not isinstance(ids, list): ids = [ids] if ids: self.validate_entries(vals, cursor, uid, ids) if context.get('init_mode'): success = True else: success = super(LDAPAddress, self).write(cursor, uid, ids, vals, context) if self.ldaplinkactive(cursor, uid, context): for address_id in ids: self.updateLdapContact(address_id, vals, cursor, uid, context) return success
def unlink(self, cursor, uid, ids, context=None): if context is None: context = {} if ids: self.getconn(cursor, uid, {}) if not isinstance(ids, list): ids = [ids] if self.ldaplinkactive(cursor, uid, context): for id in ids: self.removeLdapContact(id, cursor, uid) return super(LDAPAddress, self).unlink(cursor, uid, ids)
def validate_entries(self, vals, cursor, uid, ids): """Validate data of an address based on the inetOrgPerson schema""" for val in vals: try: if isinstance(vals[val], basestring): vals[val] = unicode(vals[val].decode('utf8')) except UnicodeError: logger.notifyChannel('LDAP encode', netsvc.LOG_DEBUG, 'cannot unicode ' + vals[val]) pass
if ids is not None: if isinstance(ids, (int, long)): ids = [ids] if len(ids) == 1: self.addNeededFields(ids[0], vals, cursor, uid) email = vals.get('email', False) phone = vals.get('phone', False) fax = vals.get('fax', False) mobile = vals.get('mobile', False) private_phone = vals.get('private_phone', False) if email: if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", email) is None: raise orm.except_orm(_('Warning !'), _('Please enter a valid e-mail')) phones = (('phone', phone), ('fax', fax), ('mobile', mobile), ('private_phone', private_phone)) for phone_tuple in phones: phone_number = phone_tuple[1] if phone_number: if not phone_number.startswith('+'): raise orm.except_orm(_('Warning !'), _('Please enter a valid phone number in %s' ' international format (i.e. leading +)') % phone_tuple[0])
def getVals(self, att_name, key, vals, dico, uid, ids, cursor, context=None): """map to values to dict""" if context is None: context = {} # We explicitely test False value if vals.get(key, False) is not False: dico[att_name] = vals[key] else: if context.get('init_mode'): return False tmp = self.read(cursor, uid, ids, [key], context={}) if tmp.get(key, False): dico[att_name] = tmp[key]
def _un_unicodize_buf(self, in_buf): if isinstance(in_buf, unicode): try: return in_buf.encode() except Exception: return unicodedata.normalize("NFKD", in_buf).encode('ascii', 'ignore') return in_buf
def unUnicodize(self, indict): """remove unicode data of modlist as unicode is not supported
by python-ldap librairy till version 2.7"""
for key in indict: if not isinstance(indict[key], list): indict[key] = self._un_unicodize_buf(indict[key]) else: nonutfArray = [] for val in indict[key]: nonutfArray.append(self._un_unicodize_buf(val)) indict[key] = nonutfArray
def addNeededFields(self, id, vals, cursor, uid): previousvalue = self.browse(cursor, uid, [id])[0] if not vals.get('partner_id'): vals['partner_id'] = previousvalue.partner_id.id values_to_check = ('email', 'phone', 'fax', 'mobile', 'firstname', 'lastname', 'private_phone', 'street', 'street2') for val in values_to_check: if not vals.get(val): vals[val] = previousvalue[val]
def mappLdapObject(self, id, vals, cursor, uid, context): """Mapp ResPArtner adress to moddlist""" self.addNeededFields(id, vals, cursor, uid) conn = self.getconn(cursor, uid, {}) partner_obj = self.pool.get('res.partner') part_name = partner_obj.browse(cursor, uid, vals['partner_id']).name vals['partner'] = part_name name = self._compute_name(vals.get('firstname'), vals.get('lastname')) if name: cn = name else: cn = part_name if not vals.get('lastname'): vals['lastname'] = part_name contact_obj = {'objectclass': ['inetOrgPerson'], 'uid': ['terp_'+str(id)], 'ou': [conn.OU], 'cn': [cn], 'sn': [vals['lastname']]} if not vals.get('street'): vals['street'] = u'' if not vals.get('street2'): vals['street2'] = u'' street_key = 'street' if self.getconn(cursor, uid, {}).ACTIVDIR: # ENTERING THE M$ Realm and it is weird # We manage the address street_key = 'streetAddress' contact_obj[street_key] = vals['street'] + "\r\n" + vals['street2'] # we modifiy the class contact_obj['objectclass'] = ['top', 'person', 'organizationalPerson', 'inetOrgPerson', 'user'] # we handle the country if vals.get('country_id'): country = self.browse(cursor, uid, id).country_id if country: vals['country_id'] = country.name vals['c'] = country.code else: vals['country_id'] = False vals['c'] = False if vals.get('country_id', False): self.getVals('co', 'country_id', vals, contact_obj, uid, id, cursor, context) self.getVals('c', 'c', vals, contact_obj, uid, id, cursor, context) # we compute the display name vals['display'] = '%s %s' % (vals['partner'], contact_obj['cn'][0]) # we get the title if self.browse(cursor, uid, id).function: contact_obj['description'] = self.browse(cursor, uid, id).function.name # we replace carriage return if vals.get('comment', False): vals['comment'] = vals['comment'].replace("\n", "\r\n") # Active directory specific fields self.getVals('company', 'partner', vals, contact_obj, uid, id, cursor, context) self.getVals('info', 'comment', vals, contact_obj, uid, id, cursor, context) self.getVals('displayName', 'partner', vals, contact_obj, uid, id, cursor, context) # Web site management if self.browse(cursor, uid, id).partner_id.website: vals['website'] = self.browse(cursor, uid, id).partner_id.website self.getVals('wWWHomePage', 'website', vals, contact_obj, uid, id, cursor, context) del(vals['website']) self.getVals('title', 'title', vals, contact_obj, uid, id, cursor, context) else: contact_obj[street_key] = vals['street'] + u"\n" + vals['street2'] self.getVals('o', 'partner', vals, contact_obj, uid, id, cursor, context)
# Common attributes self.getVals('givenName', 'firstname', vals, contact_obj, uid, id, cursor, context) self.getVals('mail', 'email', vals, contact_obj, uid, id, cursor, context) self.getVals('telephoneNumber', 'phone', vals, contact_obj, uid, id, cursor, context) self.getVals('l', 'city', vals, contact_obj, uid, id, cursor, context) self.getVals('facsimileTelephoneNumber', 'fax', vals, contact_obj, uid, id, cursor, context) self.getVals('mobile', 'mobile', vals, contact_obj, uid, id, cursor, context) self.getVals('homePhone', 'private_phone', vals, contact_obj, uid, id, cursor, context) self.getVals('postalCode', 'zip', vals, contact_obj, uid, id, cursor, context) self.unUnicodize(contact_obj) return contact_obj
def saveLdapContact(self, id, vals, cursor, uid, context=None): """save openerp adress to ldap""" contact_obj = self.mappLdapObject(id, vals, cursor, uid, context) conn = self.connectToLdap(cursor, uid, context=context) try: if self.getconn(cursor, uid, context).ACTIVDIR: conn.connexion.add_s("CN=%s,OU=%s,%s" % (contact_obj['cn'][0], conn.OU, conn.CONTACT_DN), ldap.modlist.addModlist(contact_obj)) else: conn.connexion.add_s("uid=terp_%s,OU=%s,%s" % (str(id), conn.OU, conn.CONTACT_DN), ldap.modlist.addModlist(contact_obj)) except Exception: raise conn.connexion.unbind_s()
def updateLdapContact(self, id, vals, cursor, uid, context): """update an existing contact with the data of OpenERP""" conn = self.connectToLdap(cursor, uid, context={}) try: old_contatc_obj = self.getLdapContact(conn, id) except ldap.NO_SUCH_OBJECT: self.saveLdapContact(id, vals, cursor, uid, context) return contact_obj = self.mappLdapObject(id, vals, cursor, uid, context) if conn.ACTIVDIR: modlist = [] for key, val in contact_obj.items(): if key in ('cn', 'uid', 'objectclass'): continue if isinstance(val, list): val = val[0] modlist.append((ldap.MOD_REPLACE, key, val)) else: modlist = ldap.modlist.modifyModlist(old_contatc_obj[1], contact_obj) try: conn.connexion.modify_s(old_contatc_obj[0], modlist) conn.connexion.unbind_s() except Exception: raise
def removeLdapContact(self, id, cursor, uid): """Remove a contact from ldap""" conn = self.connectToLdap(cursor, uid, context={}) to_delete = None try: to_delete = self.getLdapContact(conn, id) except ldap.NO_SUCH_OBJECT: logger.notifyChannel("Warning", netsvc.LOG_INFO, _("'no object to delete in ldap' %s") % (id)) except Exception: raise try: if to_delete: conn.connexion.delete_s(to_delete[0]) conn.connexion.unbind_s() except Exception: raise
def getLdapContact(self, conn, id): result = conn.connexion.search_ext_s("ou=%s,%s" % (conn.OU, conn.CONTACT_DN), ldap.SCOPE_SUBTREE, "(&(objectclass=*)(uid=terp_"+str(id)+"))") if not result: raise ldap.NO_SUCH_OBJECT return result[0]
def ldaplinkactive(self, cursor, uid, context=None): """Check if ldap is activated for this company""" user = self.pool.get('res.users').browse(cursor, uid, uid, context=context) company = self.pool.get('res.company').browse(cursor, uid, user.company_id.id, context=context) return company.ldap_active
def getconn(self, cursor, uid, context=None): """LdapConnMApper""" if not self.ldapMapper: self.ldapMapper = LdapConnMApper(cursor, uid, self) return self.ldapMapper
def connectToLdap(self, cursor, uid, context=None): """Reinitialize ldap connection""" # getting ldap pref if not self.ldapMapper: self.getconn(cursor, uid, context) self.ldapMapper.get_connexion() return self.ldapMapper
LDAPAddress()
|