Browse Source

[IMP] Module 'auditlog' - Performing logs on 'read' operations + Some bugfixes to log inherited fields and dummy fields such as 'in_group_X' in 'res.users' model + Unit tests updated

pull/148/head
sebalix 10 years ago
parent
commit
7c72a70d20
  1. 1
      auditlog/README.rst
  2. 124
      auditlog/models/rule.py
  3. 36
      auditlog/tests/test_auditlog.py
  4. 2
      auditlog/views/auditlog_view.xml

1
auditlog/README.rst

@ -21,7 +21,6 @@ For further information, please visit:
Known issues / Roadmap Known issues / Roadmap
====================== ======================
* log ``read`` operations
* log only operations triggered by some users (currently it logs all users) * log only operations triggered by some users (currently it logs all users)
* group logs by HTTP query (thanks to werzeug)? * group logs by HTTP query (thanks to werzeug)?
* group HTTP query by user session? * group HTTP query by user session?

124
auditlog/models/rule.py

@ -19,7 +19,7 @@
# #
############################################################################## ##############################################################################
from openerp import models, fields, api, modules, _, SUPERUSER_ID
from openerp import models, fields, api, modules, _, SUPERUSER_ID, sql_db
FIELDS_BLACKLIST = [ FIELDS_BLACKLIST = [
'id', 'create_uid', 'create_date', 'write_uid', 'write_date', 'id', 'create_uid', 'create_date', 'write_uid', 'write_date',
@ -215,41 +215,30 @@ class auditlog_rule(models.Model):
def _make_read(self): def _make_read(self):
"""Instanciate a read method that log its calls.""" """Instanciate a read method that log its calls."""
# FIXME: read() seems a bit tricky, improve to handle old/new api
# @api.v7
# def read(self, cr, user, ids, fields=None, context=None,
# load='_classic_read', **kwargs):
# print "LOG READ", fields, load, kwargs
# # avoid loops
# if self.env.context.get('auditlog_method_intercepted'):
# return read.origin(
# self, cr, user, ids, fields, context, load, **kwargs)
# # call original method with a modified context
# context = dict(
# self.env.context, auditlog_method_intercepted=True)
# result = read.origin(
# self.with_context(context),
# cr, user, ids, fields, context, load, **kwargs)
# print "RESULT", result
# return result
# @api.v8
# def read(self, fields=None, load='_classic_read', **kwargs):
# print "LOG READ", fields, load, kwargs
# # avoid loops
# if self.env.context.get('auditlog_method_intercepted'):
# return read.origin(self, fields, load, **kwargs)
# # call original method with a modified context
# context = dict(
# self.env.context, auditlog_method_intercepted=True)
# result = read.origin(
# self.with_context(context), fields, load, **kwargs)
# print "RESULT", result
# return result
def read(self, *args, **kwargs): def read(self, *args, **kwargs):
result = read.origin(self, *args, **kwargs) result = read.origin(self, *args, **kwargs)
# Sometimes the result is not a list but a dictionary
try:
read_values = dict((d['id'], d) for d in result)
except TypeError:
read_values = dict((d['id'], d) for d in [result])
# Old API
if args and isinstance(args[0], sql_db.Cursor):
cr, uid, ids = args[0], args[1], args[2]
if isinstance(ids, (int, long)):
ids = [ids]
env = api.Environment(cr, uid, {})
rule_model = env['auditlog.rule']
rule_model.sudo().create_logs(
env.uid, self._name, ids,
'read', read_values)
# New API
else:
rule_model = self.env['auditlog.rule']
rule_model.sudo().create_logs(
self.env.uid, self._name, self.ids,
'read', read_values)
return result return result
return read return read
@ -294,9 +283,13 @@ class auditlog_rule(models.Model):
log_model = self.env['auditlog.log'] log_model = self.env['auditlog.log']
for res_id in res_ids: for res_id in res_ids:
model_model = self.env[res_model] model_model = self.env[res_model]
res_name = model_model.browse(res_id).name_get()
# Avoid recursivity with the 'read' method called by 'name_get()'
res_name = "%s,%s" % (res_model, res_id)
if method is not 'read':
name = model_model.browse(res_id).name_get()
res_name = name and name[0] and name[0][1] or res_name
vals = { vals = {
'name': res_name and res_name[0] and res_name[0][1] or False,
'name': res_name,
'model_id': self.pool._auditlog_model_cache[res_model], 'model_id': self.pool._auditlog_model_cache[res_model],
'res_id': res_id, 'res_id': res_id,
'method': method, 'method': method,
@ -307,23 +300,68 @@ class auditlog_rule(models.Model):
diff = DictDiffer( diff = DictDiffer(
new_values.get(res_id, EMPTY_DICT), new_values.get(res_id, EMPTY_DICT),
old_values.get(res_id, EMPTY_DICT)) old_values.get(res_id, EMPTY_DICT))
self._create_log_line_on_write(
log, diff.changed(), old_values, new_values)
self._create_log_line_on_create(log, diff.added(), new_values)
if method is 'create':
self._create_log_line_on_create(log, diff.added(), new_values)
elif method is 'read':
self._create_log_line_on_read(
log, old_values.get(res_id, EMPTY_DICT).keys(), old_values)
elif method is 'write':
self._create_log_line_on_write(
log, diff.changed(), old_values, new_values)
def _get_field(self, model, field_name): def _get_field(self, model, field_name):
cache = self.pool._auditlog_field_cache cache = self.pool._auditlog_field_cache
if field_name not in cache.get(model.model, {}): if field_name not in cache.get(model.model, {}):
cache.setdefault(model.model, {}) cache.setdefault(model.model, {})
# We use 'search()' then 'read()' instead of the 'search_read()'
# to take advantage of the 'classic_write' loading
# - we use 'search()' then 'read()' instead of the 'search_read()'
# to take advantage of the 'classic_write' loading
# - search the field in the current model and those it inherits
field_model = self.env['ir.model.fields'] field_model = self.env['ir.model.fields']
all_model_ids = [model.id]
all_model_ids.extend(
inherited.id for inherited in model.inherited_model_ids)
field = field_model.search( field = field_model.search(
[('model_id', '=', model.id), ('name', '=', field_name)])
field_data = field.read(load='_classic_write')[0]
cache[model.model][field_name] = field_data
[('model_id', 'in', all_model_ids), ('name', '=', field_name)])
# The field can be a dummy one, like 'in_group_X' on 'res.users'
# As such we can't log it (field_id is required to create a log)
if not field:
cache[model.model][field_name] = False
else:
field_data = field.read(load='_classic_write')[0]
cache[model.model][field_name] = field_data
return cache[model.model][field_name] return cache[model.model][field_name]
def _create_log_line_on_read(
self, log, fields_list, read_values):
"""Log field filled on a 'read' operation."""
log_line_model = self.env['auditlog.log.line']
for field_name in fields_list:
if field_name in FIELDS_BLACKLIST:
continue
field = self._get_field(log.model_id, field_name)
if field:
log_vals = self._prepare_log_line_vals_on_read(
log, field, read_values)
log_line_model.create(log_vals)
def _prepare_log_line_vals_on_read(self, log, field, read_values):
"""Prepare the dictionary of values used to create a log line on a
'read' operation.
"""
vals = {
'field_id': field['id'],
'log_id': log.id,
'old_value': read_values[log.res_id][field['name']],
'old_value_text': read_values[log.res_id][field['name']],
'new_value': False,
'new_value_text': False,
}
if field['relation'] and '2many' in field['ttype']:
old_value_text = self.env[field['relation']].browse(
vals['old_value']).name_get()
vals['old_value_text'] = old_value_text
return vals
def _create_log_line_on_write( def _create_log_line_on_write(
self, log, fields_list, old_values, new_values): self, log, fields_list, old_values, new_values):
"""Log field updated on a 'write' operation.""" """Log field updated on a 'write' operation."""

36
auditlog/tests/test_auditlog.py

@ -29,6 +29,7 @@ class TestAuditlog(TransactionCase):
self.env['auditlog.rule'].create({ self.env['auditlog.rule'].create({
'name': 'testrule for groups', 'name': 'testrule for groups',
'model_id': groups_model_id, 'model_id': groups_model_id,
'log_read': True,
'log_create': True, 'log_create': True,
'log_write': True, 'log_write': True,
'log_unlink': True, 'log_unlink': True,
@ -75,3 +76,38 @@ class TestAuditlog(TransactionCase):
'implied_ids': [(4, testgroup3.id)], 'implied_ids': [(4, testgroup3.id)],
}) })
testgroup4.write({'implied_ids': [(2, testgroup3.id)]}) testgroup4.write({'implied_ids': [(2, testgroup3.id)]})
def test_LogInheritedField(self):
"""Check the log works well when updating an inherited field
(e.g. field 'lang' on 'res.users' inherited from 'res.partner').
"""
auditlog_log = self.env['auditlog.log']
users_model_id = self.env.ref('base.model_res_users').id
self.env['auditlog.rule'].create({
'name': 'testrule for users',
'model_id': users_model_id,
'log_read': True,
'log_create': True,
'log_write': True,
'log_unlink': True,
'state': 'subscribed',
})
# Log 'create'
user = self.env['res.users'].create({
'name': 'testuser_inheritedfield',
'login': 'testuser.inheritedfield@company.com',
'lang': 'en_US', # field inherited from 'res.partner'
})
self.assertTrue(auditlog_log.search([
('model_id', '=', users_model_id),
('method', '=', 'create'),
('res_id', '=', user.id),
]))
# Log 'read'
data = user.read()[0]
self.assertIn('lang', data)
self.assertTrue(auditlog_log.search([
('model_id', '=', users_model_id),
('method', '=', 'read'),
('res_id', '=', user.id),
]))

2
auditlog/views/auditlog_view.xml

@ -29,7 +29,7 @@
<field name="action_id" readonly="1" groups="base.group_no_one"/> <field name="action_id" readonly="1" groups="base.group_no_one"/>
</group> </group>
<group colspan="1"> <group colspan="1">
<field name="log_read" invisible="1"/>
<field name="log_read"/>
<field name="log_write"/> <field name="log_write"/>
<field name="log_unlink"/> <field name="log_unlink"/>
<field name="log_create"/> <field name="log_create"/>

Loading…
Cancel
Save