Browse Source

Module auth_session_timeout: Pluggability (#887)

* Module auth_session_timeout:
---------------------------

* Refactor to allow other modules to inherit and augment or override the following:
** Session expiry time (deadline) calculation
** Ignored URLs
** Final session expiry (with possibility to late-abort)
* Re-ordered functionality to remove unnecessary work, as this code is called very often.
* Do not expire a session if delay gets set to zero (or unset / false)

* WIP

* Fixed flake8 lint errors

* Fixed flake8 lint errors

* WIP

* WIP

* WIP

* WIP

* WIP

* WIP

* Module: auth-session-timeout: Refactor ResUser tests to use `unittest.mock` patching

* Module: auth_session_timeout: Fixed flake8 lint errors

* Module: auth_session_timeout: Fixed flake8 lint errors
pull/979/head
jmorgannz 7 years ago
committed by Dave Lasley
parent
commit
df414dfeb1
  1. 1
      auth_session_timeout/README.rst
  2. 3
      auth_session_timeout/__openerp__.py
  3. 15
      auth_session_timeout/models/ir_config_parameter.py
  4. 98
      auth_session_timeout/models/res_users.py
  5. 1
      auth_session_timeout/tests/__init__.py
  6. 57
      auth_session_timeout/tests/test_ir_config_parameter.py
  7. 43
      auth_session_timeout/tests/test_res_user.py

1
auth_session_timeout/README.rst

@ -32,6 +32,7 @@ Contributors
* Cédric Pigeon <cedric.pigeon@acsone.eu>
* Dhinesh D <dvdhinesh.mail@gmail.com>
* Jesse Morgan <jmorgan.nz@gmail.com>
Maintainer
----------

3
auth_session_timeout/__openerp__.py

@ -9,7 +9,8 @@
'summary': """
This module disable all inactive sessions since a given delay""",
'author': "ACSONE SA/NV, Dhinesh D, Odoo Community Association (OCA)",
'author': "ACSONE SA/NV, Dhinesh D, Jesse Morgan, \
Odoo Community Association (OCA)",
'maintainer': 'Odoo Community Association (OCA)',
'website': "http://acsone.eu",

15
auth_session_timeout/models/ir_config_parameter.py

@ -5,7 +5,6 @@
from openerp import models, api, tools, SUPERUSER_ID
DELAY_KEY = 'inactive_session_time_out_delay'
IGNORED_PATH_KEY = 'inactive_session_time_out_ignored_url'
@ -13,7 +12,7 @@ IGNORED_PATH_KEY = 'inactive_session_time_out_ignored_url'
class IrConfigParameter(models.Model):
_inherit = 'ir.config_parameter'
@tools.ormcache(skiparg=0)
@tools.ormcache('db')
def get_session_parameters(self, db):
param_model = self.pool['ir.config_parameter']
cr = self.pool.cursor()
@ -28,9 +27,19 @@ class IrConfigParameter(models.Model):
cr.close()
return delay, urls
def _auth_timeout_get_parameter_delay(self):
delay, urls = self.get_session_parameters(self.pool.db_name)
return delay
def _auth_timeout_get_parameter_ignoredurls(self):
delay, urls = self.get_session_parameters(self.pool.db_name)
return urls
@api.multi
def write(self, vals, context=None):
res = super(IrConfigParameter, self).write(vals)
if self.key in [DELAY_KEY, IGNORED_PATH_KEY]:
if self.key == DELAY_KEY:
self.get_session_parameters.clear_cache(self)
elif self.key == IGNORED_PATH_KEY:
self.get_session_parameters.clear_cache(self)
return res

98
auth_session_timeout/models/res_users.py

@ -3,8 +3,9 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from openerp import models
from openerp import http
from openerp.http import root
from openerp.http import request
@ -13,31 +14,96 @@ from os import utime
from os.path import getmtime
from time import time
_logger = logging.getLogger(__name__)
class ResUsers(models.Model):
_inherit = 'res.users'
def _check_session_validity(self, db, uid, passwd):
def _auth_timeout_ignoredurls_get(self):
"""Pluggable method for calculating ignored urls
Defaults to stored config param
"""
param_model = self.pool['ir.config_parameter']
return param_model._auth_timeout_get_parameter_ignoredurls()
def _auth_timeout_deadline_calculate(self):
"""Pluggable method for calculating timeout deadline
Defaults to current time minus delay using delay stored as config param
"""
param_model = self.pool['ir.config_parameter']
delay = param_model._auth_timeout_get_parameter_delay()
if delay is False or delay <= 0:
return False
return time() - delay
def _auth_timeout_session_terminate(self, session):
"""Pluggable method for terminating a timed-out session
This is a late stage where a session timeout can be aborted.
Useful if you want to do some heavy checking, as it won't be
called unless the session inactivity deadline has been reached.
Return:
True: session terminated
False: session timeout cancelled
"""
if session.db and session.uid:
session.logout(keep_db=True)
return True
def _auth_timeout_check(self):
if not request:
return
session = request.session
session_store = root.session_store
param_obj = self.pool['ir.config_parameter']
delay, urls = param_obj.get_session_parameters(db)
deadline = time() - delay
path = session_store.get_session_filename(session.sid)
try:
if getmtime(path) < deadline:
if session.db and session.uid:
session.logout(keep_db=True)
elif http.request.httprequest.path not in urls:
# the session is not expired, update the last modification
# and access time.
# Calculate deadline
deadline = self._auth_timeout_deadline_calculate()
# Check if past deadline
expired = False
if deadline is not False:
path = root.session_store.get_session_filename(session.sid)
try:
expired = getmtime(path) < deadline
except OSError as e:
_logger.warning(
'Exception reading session file modified time: %s'
% e
)
pass
# Try to terminate the session
terminated = False
if expired:
terminated = self._auth_timeout_session_terminate(session)
# If session terminated, all done
if terminated:
return
# Else, conditionally update session modified and access times
ignoredurls = self._auth_timeout_ignoredurls_get()
if request.httprequest.path not in ignoredurls:
if 'path' not in locals():
path = root.session_store.get_session_filename(session.sid)
try:
utime(path, None)
except OSError:
pass
except OSError as e:
_logger.warning(
'Exception updating session file access/modified times: %s'
% e
)
pass
return
def _check_session_validity(self, db, uid, passwd):
"""Adaptor method for backward compatibility"""
return self._auth_timeout_check()
def check(self, db, uid, passwd):
res = super(ResUsers, self).check(db, uid, passwd)
self._check_session_validity(db, uid, passwd)

1
auth_session_timeout/tests/__init__.py

@ -4,3 +4,4 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from . import test_ir_config_parameter
from . import test_res_user

57
auth_session_timeout/tests/test_ir_config_parameter.py

@ -3,26 +3,69 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import threading
from openerp.tests import common
import openerp
class TestIrConfigParameter(common.TransactionCase):
def setUp(self):
super(TestIrConfigParameter, self).setUp()
self.db = openerp.tools.config['db_name']
if not self.db and hasattr(threading.current_thread(), 'dbname'):
self.db = threading.current_thread().dbname
self.db = self.env.cr.dbname
self.param_obj = self.env['ir.config_parameter']
self.data_obj = self.env['ir.model.data']
self.delay = self.env.ref(
'auth_session_timeout.inactive_session_time_out_delay')
def test_check_delay(self):
def test_check_session_params(self):
delay, urls = self.param_obj.get_session_parameters(self.db)
self.assertEqual(delay, int(self.delay.value))
self.assertIsInstance(delay, int)
self.assertIsInstance(urls, list)
def test_check_session_param_delay(self):
delay = self.param_obj._auth_timeout_get_parameter_delay()
self.assertEqual(delay, int(self.delay.value))
self.assertIsInstance(delay, int)
def test_check_session_param_urls(self):
urls = self.param_obj._auth_timeout_get_parameter_ignoredurls()
self.assertIsInstance(urls, list)
class TestIrConfigParameterCaching(common.TransactionCase):
def setUp(self):
super(TestIrConfigParameterCaching, self).setUp()
self.db = self.env.cr.dbname
self.param_obj = self.env['ir.config_parameter']
self.get_param_called = False
test = self
def get_param(*args, **kwargs):
test.get_param_called = True
return orig_get_param(args[3], args[4])
orig_get_param = self.param_obj.get_param
self.param_obj._patch_method(
'get_param',
get_param)
def tearDown(self):
super(TestIrConfigParameterCaching, self).tearDown()
self.param_obj._revert_method('get_param')
def test_check_param_cache_working(self):
self.get_param_called = False
delay, urls = self.param_obj.get_session_parameters(self.db)
self.assertTrue(self.get_param_called)
self.get_param_called = False
delay, urls = self.param_obj.get_session_parameters(self.db)
self.assertFalse(self.get_param_called)
def test_check_param_writes_clear_cache(self):
self.get_param_called = False
delay, urls = self.param_obj.get_session_parameters(self.db)
self.assertTrue(self.get_param_called)
self.get_param_called = False
self.param_obj.set_param('inactive_session_time_out_delay', 7201)
delay, urls = self.param_obj.get_session_parameters(self.db)
self.assertTrue(self.get_param_called)

43
auth_session_timeout/tests/test_res_user.py

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import mock
from os import strerror
from errno import ENOENT
from openerp.tests import common
_packagepath = 'openerp.addons.auth_session_timeout'
class ResUsers(common.TransactionCase):
def setUp(self):
super(ResUsers, self).setUp()
self.resusers_obj = self.env['res.users']
@mock.patch(_packagepath + '.models.res_users.request')
@mock.patch(_packagepath + '.models.res_users.root')
@mock.patch(_packagepath + '.models.res_users.getmtime')
def test_on_timeout_session_loggedout(self, mock_getmtime,
mock_root, mock_request):
mock_getmtime.return_value = 0
mock_request.session.uid = self.env.uid
mock_request.session.dbname = self.env.cr.dbname
mock_request.session.sid = 123
mock_request.session.logout = mock.Mock()
self.resusers_obj._auth_timeout_check()
self.assertTrue(mock_request.session.logout.called)
@mock.patch(_packagepath + '.models.res_users.request')
@mock.patch(_packagepath + '.models.res_users.root')
@mock.patch(_packagepath + '.models.res_users.getmtime')
@mock.patch(_packagepath + '.models.res_users.utime')
def test_sessionfile_io_exceptions_managed(self, mock_utime, mock_getmtime,
mock_root, mock_request):
mock_getmtime.side_effect = OSError(
ENOENT, strerror(ENOENT), 'non-existent-filename')
mock_request.session.uid = self.env.uid
mock_request.session.dbname = self.env.cr.dbname
mock_request.session.sid = 123
self.resusers_obj._auth_timeout_check()
Loading…
Cancel
Save