Browse Source

[ADD] users_ldap_populate: migrate functionality added to 6.1 after 7.0 port (#408)

* [ADD] possibility to deactivate users not found in ldap while populating

* [IMP] search in ldap for every possibly unknown user to be really sure it
actually is not present there

* [FIX] refactoring mistake

* [IMP] don't use self.query() to be sure to be stopped if any error occurs

* [IMP] remove superfluous check as exceptions are not supressed any more

* [FIX] typo in variable name
[FIX] handle unicode characters in search filter
[FIX] search for user's login, not her name

* [FIX] don't pass user_name as assertion_value

* [FIX] don't deactivate users if we got a non-existent ldap configuration

* [FIX] flake8

* [FIX] more flake8

* [FIX] make form usable

* [FIX] name clash between function and field

* [ADD] test
pull/931/head
Holger Brunn 8 years ago
committed by Damien Crier
parent
commit
668bb64134
  1. 37
      users_ldap_populate/model/populate_wizard.py
  2. 144
      users_ldap_populate/model/users_ldap.py
  3. 4
      users_ldap_populate/tests/__init__.py
  4. 70
      users_ldap_populate/tests/test_users_ldap_populate.py
  5. 2
      users_ldap_populate/view/populate_wizard.xml
  6. 7
      users_ldap_populate/view/users_ldap.xml

37
users_ldap_populate/model/populate_wizard.py

@ -18,28 +18,27 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from openerp.osv import orm, fields # pylint: disable=W0402
from openerp import models, fields, api
class CompanyLDAPPopulateWizard(models.TransientModel):
class CompanyLDAPPopulateWizard(orm.TransientModel):
_name = 'res.company.ldap.populate_wizard'
_description = 'Populate users from LDAP'
_columns = {
'name': fields.char('Name', size=16),
'ldap_id': fields.many2one(
'res.company.ldap', 'LDAP Configuration'),
'users_created': fields.integer(
'Number of users created', readonly=True),
'users_deactivated': fields.integer(
'Number of users deactivated', readonly=True),
}
name = fields.Char('Name', size=16)
ldap_id = fields.Many2one(
'res.company.ldap',
'LDAP Configuration'
)
users_created = fields.Integer(
'Number of users created',
readonly=True
)
@api.model
@api.returns('self', lambda value: value.id)
def create(self, vals):
def create(self, cr, uid, vals, context=None):
ldap_pool = self.pool.get('res.company.ldap')
if 'ldap_id' in vals:
ldap = self.env['res.company.ldap'].browse(vals['ldap_id'])
vals['users_created'] = ldap.action_populate()
return super(CompanyLDAPPopulateWizard, self).create(vals)
vals['users_created'], vals['users_deactivated'] =\
ldap_pool.action_populate(
cr, uid, vals['ldap_id'], context=context)
return super(CompanyLDAPPopulateWizard, self).create(
cr, uid, vals, context=None)

144
users_ldap_populate/model/users_ldap.py

@ -20,9 +20,9 @@
##############################################################################
import re
from openerp import models, api, _
from openerp.exceptions import UserError
from openerp.osv import orm, fields # pylint: disable=W0402
import ldap
from openerp import SUPERUSER_ID
import logging
_logger = logging.getLogger(__name__)
@ -33,11 +33,26 @@ except ImportError:
_logger.debug('Can not `from ldap.filter import filter_format`.')
class CompanyLDAP(models.Model):
class CompanyLDAP(orm.Model):
_inherit = 'res.company.ldap'
@api.multi
def action_populate(self):
_columns = {
'no_deactivate_user_ids': fields.many2many(
'res.users', 'res_company_ldap_no_deactivate_user_rel',
'ldap_id', 'user_id',
'Users never to deactivate',
help='List users who never should be deactivated by'
' the deactivation wizard'),
'deactivate_unknown_users': fields.boolean(
'Deactivate unknown users'),
}
_defaults = {
'no_deactivate_user_ids': [(6, 0, [SUPERUSER_ID])],
'deactivate_unknown_users': False,
}
def action_populate(self, cr, uid, ids, context=None):
"""
Prepopulate the user table from one or more LDAP resources.
@ -46,13 +61,32 @@ class CompanyLDAP(models.Model):
Return the number of users created (as far as we can tell).
"""
users_pool = self.env['res.users']
users_no_before = users_pool.search_count([])
if isinstance(ids, (int, float)):
ids = [ids]
users_pool = self.pool.get('res.users')
users_no_before = users_pool.search(
cr, uid, [], context=context, count=True)
logger = logging.getLogger('orm.ldap')
logger.debug("action_populate called on res.company.ldap ids %s",
self.ids)
logger.debug("action_populate called on res.company.ldap ids %s", ids)
deactivate_unknown = None
known_user_ids = [uid]
for this in self.read(cr, uid, ids,
[
'no_deactivate_user_ids',
'deactivate_unknown_users',
],
context=context, load='_classic_write'):
if deactivate_unknown is None:
deactivate_unknown = True
known_user_ids.extend(this['no_deactivate_user_ids'])
deactivate_unknown &= this['deactivate_unknown_users']
for conf in self.get_ldap_dicts():
if deactivate_unknown:
logger.debug("will deactivate unknown users")
for conf in self.get_ldap_dicts(cr, ids):
if not conf['create_user']:
continue
attribute_match = re.search(
@ -60,29 +94,89 @@ class CompanyLDAP(models.Model):
if attribute_match:
login_attr = attribute_match.group(1)
else:
raise UserError(
_("No login attribute found: "
"Could not extract login attribute from filter %s") %
raise orm.except_orm(
"No login attribute found",
"Could not extract login attribute from filter %s" %
conf['ldap_filter'])
ldap_filter = filter_format(conf['ldap_filter'] % '*', ())
for result in self.query(conf, ldap_filter.encode('utf-8')):
self.get_or_create_user(conf, result[1][login_attr][0], result)
results = self.get_ldap_entry_dicts(conf)
for result in results:
user_id = self.get_or_create_user(
cr, uid, conf, result[1][login_attr][0], result)
# this happens if something goes wrong while creating the user
# or fetching information from ldap
if not user_id:
deactivate_unknown = False
known_user_ids.append(user_id)
users_no_after = users_pool.search_count([])
users_no_after = users_pool.search(
cr, uid, [], context=context, count=True)
users_created = users_no_after - users_no_before
deactivated_users_count = 0
if deactivate_unknown:
deactivated_users_count = self.do_deactivate_unknown_users(
cr, uid, ids, known_user_ids, context=context)
logger.debug("%d users created", users_created)
return users_created
logger.debug("%d users deactivated", deactivated_users_count)
return users_created, deactivated_users_count
def do_deactivate_unknown_users(
self, cr, uid, ids, known_user_ids, context=None):
"""
Deactivate users not found in last populate run
"""
res_users = self.pool.get('res.users')
unknown_user_ids = []
for unknown_user in res_users.read(
cr, uid,
res_users.search(
cr, uid,
[('id', 'not in', known_user_ids)],
context=context),
['login'],
context=context):
present_in_ldap = False
for conf in self.get_ldap_dicts(cr, ids):
present_in_ldap |= bool(self.get_ldap_entry_dicts(
conf, user_name=unknown_user['login']))
if not present_in_ldap:
res_users.write(
cr, uid, unknown_user['id'], {'active': False},
context=context)
unknown_user_ids.append(unknown_user['id'])
return len(unknown_user_ids)
def get_ldap_entry_dicts(self, conf, user_name='*'):
"""
Execute ldap query as defined in conf
Don't call self.query because it supresses possible exceptions
"""
ldap_filter = filter_format(conf['ldap_filter'] % user_name, ())
conn = self.connect(conf)
conn.simple_bind_s(conf['ldap_binddn'] or '',
conf['ldap_password'] or '')
results = conn.search_st(conf['ldap_base'], ldap.SCOPE_SUBTREE,
ldap_filter.encode('utf8'), None,
timeout=60)
conn.unbind()
return results
@api.multi
def populate_wizard(self):
def populate_wizard(self, cr, uid, ids, context=None):
"""
GUI wrapper for the populate method that reports back
the number of users created.
"""
if not self:
if not ids:
return
wizard_obj = self.env['res.company.ldap.populate_wizard']
res_id = wizard_obj.create({'ldap_id': self.id}).id
if isinstance(ids, (int, float)):
ids = [ids]
wizard_obj = self.pool.get('res.company.ldap.populate_wizard')
res_id = wizard_obj.create(
cr, uid, {'ldap_id': ids[0]}, context=context)
return {
'name': wizard_obj._description,
@ -90,7 +184,7 @@ class CompanyLDAP(models.Model):
'view_mode': 'form',
'res_model': wizard_obj._name,
'domain': [],
'context': self.env.context,
'context': context,
'type': 'ir.actions.act_window',
'target': 'new',
'res_id': res_id,

4
users_ldap_populate/tests/__init__.py

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
# © 2016 Therp BV <http://therp.nl>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from . import test_users_ldap_populate

70
users_ldap_populate/tests/test_users_ldap_populate.py

@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# © 2016 Therp BV <http://therp.nl>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from openerp.tests.common import TransactionCase
from contextlib import contextmanager
class patch_ldap_connection(object):
def __init__(self, results):
self.results = results
def simple_bind_s(self, user, password):
return True
def search_st(self, base, scope, ldap_filter, attributes, timeout=None):
if ldap_filter == '(uid=*)':
return self.results
else:
return []
def unbind(self):
return True
@contextmanager
def patch_ldap(self, results):
""" defuse ldap functions to return fake entries instead of talking to a
server. Use this in your own ldap related tests """
import ldap
original_initialize = ldap.initialize
def initialize(uri):
return patch_ldap_connection(results)
ldap.initialize = initialize
yield
ldap.initialize = original_initialize
def get_fake_ldap(self):
company = self.env.ref('base.main_company')
company.write({
'ldaps': [(0, 0, {
'ldap_server': 'fake',
'ldap_port': 'fake',
'ldap_filter': '(uid=%s)',
'ldap_base': 'fake',
'deactivate_unknown_users': True,
'no_deactivate_user_ids': [(6, 0, [
self.env.ref('base.user_root').id,
])],
})],
})
return company.ldaps.filtered(
lambda x: x.ldap_server == 'fake'
)
class TestUsersLdapPopulate(TransactionCase):
def test_users_ldap_populate(self):
with patch_ldap(self, [('DN=fake', {
'cn': ['fake'],
'uid': ['fake'],
'mail': ['fake@fakery.com'],
})]):
get_fake_ldap(self).populate_wizard()
self.assertFalse(self.env.ref('base.user_demo').active)
self.assertTrue(self.env.ref('base.user_root').active)
self.assertTrue(self.env['res.users'].search([
('login', '=', 'fake')
]))

2
users_ldap_populate/view/populate_wizard.xml

@ -2,12 +2,12 @@
<openerp>
<data>
<record model="ir.ui.view" id="populate_wizard_view">
<field name="name">Add populate button to ldap view</field>
<field name="model">res.company.ldap.populate_wizard</field>
<field name="arch" type="xml">
<form string="Add populate button to ldap view">
<group>
<field name="users_created"/>
<field name="users_deactivated"/>
<button icon="gtk-ok" string="OK" special="cancel"/>
</group>
</form>

7
users_ldap_populate/view/users_ldap.xml

@ -6,8 +6,11 @@
<field name="model">res.company</field>
<field name="inherit_id" ref="auth_ldap.company_form_view"/>
<field name="arch" type="xml">
<xpath name="populate_ldap" expr="//field[@name='ldaps']/form" position="inside">
<separator string="Populate user database" colspan="4"/>
<xpath expr="//form[@string='LDAP Configuration']" position="inside">
<group string="Populate user database">
<field name="deactivate_unknown_users"/>
<field name="no_deactivate_user_ids" attrs="{'invisible': [('deactivate_unknown_users', '=', False)]}" widget="many2many_tags" />
</group>
<button name="populate_wizard"
string="Populate"
type="object"

Loading…
Cancel
Save