From 668bb6413451fab05534e1174eb9ace03cc3e8c5 Mon Sep 17 00:00:00 2001 From: Holger Brunn Date: Mon, 31 Jul 2017 15:28:08 +0200 Subject: [PATCH] [ADD] users_ldap_populate: migrate functionality added to 6.1 after 7.0 port (#408) * [ADD] possibility to deactivate users not found in ldap while populating * [IMP] search in ldap for every possibly unknown user to be really sure it actually is not present there * [FIX] refactoring mistake * [IMP] don't use self.query() to be sure to be stopped if any error occurs * [IMP] remove superfluous check as exceptions are not supressed any more * [FIX] typo in variable name [FIX] handle unicode characters in search filter [FIX] search for user's login, not her name * [FIX] don't pass user_name as assertion_value * [FIX] don't deactivate users if we got a non-existent ldap configuration * [FIX] flake8 * [FIX] more flake8 * [FIX] make form usable * [FIX] name clash between function and field * [ADD] test --- users_ldap_populate/model/populate_wizard.py | 37 +++-- users_ldap_populate/model/users_ldap.py | 144 +++++++++++++++--- users_ldap_populate/tests/__init__.py | 4 + .../tests/test_users_ldap_populate.py | 70 +++++++++ users_ldap_populate/view/populate_wizard.xml | 2 +- users_ldap_populate/view/users_ldap.xml | 7 +- 6 files changed, 217 insertions(+), 47 deletions(-) create mode 100644 users_ldap_populate/tests/__init__.py create mode 100644 users_ldap_populate/tests/test_users_ldap_populate.py diff --git a/users_ldap_populate/model/populate_wizard.py b/users_ldap_populate/model/populate_wizard.py index 513516902..b82476bf5 100644 --- a/users_ldap_populate/model/populate_wizard.py +++ b/users_ldap_populate/model/populate_wizard.py @@ -18,28 +18,27 @@ # along with this program. If not, see . # ############################################################################## +from openerp.osv import orm, fields # pylint: disable=W0402 -from openerp import models, fields, api - -class CompanyLDAPPopulateWizard(models.TransientModel): +class CompanyLDAPPopulateWizard(orm.TransientModel): _name = 'res.company.ldap.populate_wizard' _description = 'Populate users from LDAP' + _columns = { + 'name': fields.char('Name', size=16), + 'ldap_id': fields.many2one( + 'res.company.ldap', 'LDAP Configuration'), + 'users_created': fields.integer( + 'Number of users created', readonly=True), + 'users_deactivated': fields.integer( + 'Number of users deactivated', readonly=True), + } - name = fields.Char('Name', size=16) - ldap_id = fields.Many2one( - 'res.company.ldap', - 'LDAP Configuration' - ) - users_created = fields.Integer( - 'Number of users created', - readonly=True - ) - - @api.model - @api.returns('self', lambda value: value.id) - def create(self, vals): + def create(self, cr, uid, vals, context=None): + ldap_pool = self.pool.get('res.company.ldap') if 'ldap_id' in vals: - ldap = self.env['res.company.ldap'].browse(vals['ldap_id']) - vals['users_created'] = ldap.action_populate() - return super(CompanyLDAPPopulateWizard, self).create(vals) + vals['users_created'], vals['users_deactivated'] =\ + ldap_pool.action_populate( + cr, uid, vals['ldap_id'], context=context) + return super(CompanyLDAPPopulateWizard, self).create( + cr, uid, vals, context=None) diff --git a/users_ldap_populate/model/users_ldap.py b/users_ldap_populate/model/users_ldap.py index 065afbeec..c20ae0503 100644 --- a/users_ldap_populate/model/users_ldap.py +++ b/users_ldap_populate/model/users_ldap.py @@ -20,9 +20,9 @@ ############################################################################## import re - -from openerp import models, api, _ -from openerp.exceptions import UserError +from openerp.osv import orm, fields # pylint: disable=W0402 +import ldap +from openerp import SUPERUSER_ID import logging _logger = logging.getLogger(__name__) @@ -33,11 +33,26 @@ except ImportError: _logger.debug('Can not `from ldap.filter import filter_format`.') -class CompanyLDAP(models.Model): +class CompanyLDAP(orm.Model): _inherit = 'res.company.ldap' - @api.multi - def action_populate(self): + _columns = { + 'no_deactivate_user_ids': fields.many2many( + 'res.users', 'res_company_ldap_no_deactivate_user_rel', + 'ldap_id', 'user_id', + 'Users never to deactivate', + help='List users who never should be deactivated by' + ' the deactivation wizard'), + 'deactivate_unknown_users': fields.boolean( + 'Deactivate unknown users'), + } + + _defaults = { + 'no_deactivate_user_ids': [(6, 0, [SUPERUSER_ID])], + 'deactivate_unknown_users': False, + } + + def action_populate(self, cr, uid, ids, context=None): """ Prepopulate the user table from one or more LDAP resources. @@ -46,13 +61,32 @@ class CompanyLDAP(models.Model): Return the number of users created (as far as we can tell). """ - users_pool = self.env['res.users'] - users_no_before = users_pool.search_count([]) + if isinstance(ids, (int, float)): + ids = [ids] + + users_pool = self.pool.get('res.users') + users_no_before = users_pool.search( + cr, uid, [], context=context, count=True) logger = logging.getLogger('orm.ldap') - logger.debug("action_populate called on res.company.ldap ids %s", - self.ids) + logger.debug("action_populate called on res.company.ldap ids %s", ids) + + deactivate_unknown = None + known_user_ids = [uid] + for this in self.read(cr, uid, ids, + [ + 'no_deactivate_user_ids', + 'deactivate_unknown_users', + ], + context=context, load='_classic_write'): + if deactivate_unknown is None: + deactivate_unknown = True + known_user_ids.extend(this['no_deactivate_user_ids']) + deactivate_unknown &= this['deactivate_unknown_users'] - for conf in self.get_ldap_dicts(): + if deactivate_unknown: + logger.debug("will deactivate unknown users") + + for conf in self.get_ldap_dicts(cr, ids): if not conf['create_user']: continue attribute_match = re.search( @@ -60,29 +94,89 @@ class CompanyLDAP(models.Model): if attribute_match: login_attr = attribute_match.group(1) else: - raise UserError( - _("No login attribute found: " - "Could not extract login attribute from filter %s") % + raise orm.except_orm( + "No login attribute found", + "Could not extract login attribute from filter %s" % conf['ldap_filter']) - ldap_filter = filter_format(conf['ldap_filter'] % '*', ()) - for result in self.query(conf, ldap_filter.encode('utf-8')): - self.get_or_create_user(conf, result[1][login_attr][0], result) + results = self.get_ldap_entry_dicts(conf) + for result in results: + user_id = self.get_or_create_user( + cr, uid, conf, result[1][login_attr][0], result) + # this happens if something goes wrong while creating the user + # or fetching information from ldap + if not user_id: + deactivate_unknown = False + known_user_ids.append(user_id) - users_no_after = users_pool.search_count([]) + users_no_after = users_pool.search( + cr, uid, [], context=context, count=True) users_created = users_no_after - users_no_before + + deactivated_users_count = 0 + if deactivate_unknown: + deactivated_users_count = self.do_deactivate_unknown_users( + cr, uid, ids, known_user_ids, context=context) + logger.debug("%d users created", users_created) - return users_created + logger.debug("%d users deactivated", deactivated_users_count) + return users_created, deactivated_users_count + + def do_deactivate_unknown_users( + self, cr, uid, ids, known_user_ids, context=None): + """ + Deactivate users not found in last populate run + """ + res_users = self.pool.get('res.users') + unknown_user_ids = [] + for unknown_user in res_users.read( + cr, uid, + res_users.search( + cr, uid, + [('id', 'not in', known_user_ids)], + context=context), + ['login'], + context=context): + present_in_ldap = False + for conf in self.get_ldap_dicts(cr, ids): + present_in_ldap |= bool(self.get_ldap_entry_dicts( + conf, user_name=unknown_user['login'])) + if not present_in_ldap: + res_users.write( + cr, uid, unknown_user['id'], {'active': False}, + context=context) + unknown_user_ids.append(unknown_user['id']) + + return len(unknown_user_ids) + + def get_ldap_entry_dicts(self, conf, user_name='*'): + """ + Execute ldap query as defined in conf + + Don't call self.query because it supresses possible exceptions + """ + ldap_filter = filter_format(conf['ldap_filter'] % user_name, ()) + conn = self.connect(conf) + conn.simple_bind_s(conf['ldap_binddn'] or '', + conf['ldap_password'] or '') + results = conn.search_st(conf['ldap_base'], ldap.SCOPE_SUBTREE, + ldap_filter.encode('utf8'), None, + timeout=60) + conn.unbind() + + return results - @api.multi - def populate_wizard(self): + def populate_wizard(self, cr, uid, ids, context=None): """ GUI wrapper for the populate method that reports back the number of users created. """ - if not self: + if not ids: return - wizard_obj = self.env['res.company.ldap.populate_wizard'] - res_id = wizard_obj.create({'ldap_id': self.id}).id + if isinstance(ids, (int, float)): + ids = [ids] + wizard_obj = self.pool.get('res.company.ldap.populate_wizard') + res_id = wizard_obj.create( + cr, uid, {'ldap_id': ids[0]}, context=context) return { 'name': wizard_obj._description, @@ -90,7 +184,7 @@ class CompanyLDAP(models.Model): 'view_mode': 'form', 'res_model': wizard_obj._name, 'domain': [], - 'context': self.env.context, + 'context': context, 'type': 'ir.actions.act_window', 'target': 'new', 'res_id': res_id, diff --git a/users_ldap_populate/tests/__init__.py b/users_ldap_populate/tests/__init__.py new file mode 100644 index 000000000..1da05de48 --- /dev/null +++ b/users_ldap_populate/tests/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from . import test_users_ldap_populate diff --git a/users_ldap_populate/tests/test_users_ldap_populate.py b/users_ldap_populate/tests/test_users_ldap_populate.py new file mode 100644 index 000000000..befbdf6c1 --- /dev/null +++ b/users_ldap_populate/tests/test_users_ldap_populate.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# © 2016 Therp BV +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +from openerp.tests.common import TransactionCase +from contextlib import contextmanager + + +class patch_ldap_connection(object): + def __init__(self, results): + self.results = results + + def simple_bind_s(self, user, password): + return True + + def search_st(self, base, scope, ldap_filter, attributes, timeout=None): + if ldap_filter == '(uid=*)': + return self.results + else: + return [] + + def unbind(self): + return True + + +@contextmanager +def patch_ldap(self, results): + """ defuse ldap functions to return fake entries instead of talking to a + server. Use this in your own ldap related tests """ + import ldap + original_initialize = ldap.initialize + + def initialize(uri): + return patch_ldap_connection(results) + ldap.initialize = initialize + yield + ldap.initialize = original_initialize + + +def get_fake_ldap(self): + company = self.env.ref('base.main_company') + company.write({ + 'ldaps': [(0, 0, { + 'ldap_server': 'fake', + 'ldap_port': 'fake', + 'ldap_filter': '(uid=%s)', + 'ldap_base': 'fake', + 'deactivate_unknown_users': True, + 'no_deactivate_user_ids': [(6, 0, [ + self.env.ref('base.user_root').id, + ])], + })], + }) + return company.ldaps.filtered( + lambda x: x.ldap_server == 'fake' + ) + + +class TestUsersLdapPopulate(TransactionCase): + def test_users_ldap_populate(self): + with patch_ldap(self, [('DN=fake', { + 'cn': ['fake'], + 'uid': ['fake'], + 'mail': ['fake@fakery.com'], + })]): + get_fake_ldap(self).populate_wizard() + self.assertFalse(self.env.ref('base.user_demo').active) + self.assertTrue(self.env.ref('base.user_root').active) + self.assertTrue(self.env['res.users'].search([ + ('login', '=', 'fake') + ])) diff --git a/users_ldap_populate/view/populate_wizard.xml b/users_ldap_populate/view/populate_wizard.xml index 766e49f6d..083ec3b0e 100644 --- a/users_ldap_populate/view/populate_wizard.xml +++ b/users_ldap_populate/view/populate_wizard.xml @@ -2,12 +2,12 @@ - Add populate button to ldap view res.company.ldap.populate_wizard
+