Browse Source
Module auth_session_timeout: Pluggability (#887)
Module auth_session_timeout: Pluggability (#887)
* Module auth_session_timeout: --------------------------- * Refactor to allow other modules to inherit and augment or override the following: ** Session expiry time (deadline) calculation ** Ignored URLs ** Final session expiry (with possibility to late-abort) * Re-ordered functionality to remove unnecessary work, as this code is called very often. * Do not expire a session if delay gets set to zero (or unset / false) * WIP * Fixed flake8 lint errors * Fixed flake8 lint errors * WIP * WIP * WIP * WIP * WIP * WIP * Module: auth-session-timeout: Refactor ResUser tests to use `unittest.mock` patching * Module: auth_session_timeout: Fixed flake8 lint errors * Module: auth_session_timeout: Fixed flake8 lint errorspull/580/head
jmorgannz
7 years ago
committed by
Dave Lasley
No known key found for this signature in database
GPG Key ID: 7DDBA4BA81B934CF
6 changed files with 211 additions and 54 deletions
-
1auth_session_timeout/README.rst
-
1auth_session_timeout/__manifest__.py
-
42auth_session_timeout/models/ir_config_parameter.py
-
115auth_session_timeout/models/res_users.py
-
75auth_session_timeout/tests/test_ir_config_parameter.py
-
31auth_session_timeout/tests/test_res_users.py
@ -1,41 +1,110 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# (c) 2015 ACSONE SA/NV, Dhinesh D |
|||
|
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
import logging |
|||
|
|||
from odoo import models |
|||
|
|||
from odoo.http import root |
|||
from odoo.http import request |
|||
|
|||
from os import utime |
|||
from os.path import getmtime |
|||
from time import time |
|||
|
|||
from odoo import models, http |
|||
_logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
class ResUsers(models.Model): |
|||
_inherit = 'res.users' |
|||
|
|||
@classmethod |
|||
def _check_session_validity(cls, db, uid, passwd): |
|||
if not http.request: |
|||
def _auth_timeout_ignoredurls_get(self): |
|||
"""Pluggable method for calculating ignored urls |
|||
Defaults to stored config param |
|||
""" |
|||
param_model = self.pool['ir.config_parameter'] |
|||
return param_model._auth_timeout_get_parameter_ignoredurls() |
|||
|
|||
def _auth_timeout_deadline_calculate(self): |
|||
"""Pluggable method for calculating timeout deadline |
|||
Defaults to current time minus delay using delay stored as config param |
|||
""" |
|||
param_model = self.pool['ir.config_parameter'] |
|||
delay = param_model._auth_timeout_get_parameter_delay() |
|||
if delay is False or delay <= 0: |
|||
return False |
|||
return time() - delay |
|||
|
|||
def _auth_timeout_session_terminate(self, session): |
|||
"""Pluggable method for terminating a timed-out session |
|||
|
|||
This is a late stage where a session timeout can be aborted. |
|||
Useful if you want to do some heavy checking, as it won't be |
|||
called unless the session inactivity deadline has been reached. |
|||
|
|||
Return: |
|||
True: session terminated |
|||
False: session timeout cancelled |
|||
""" |
|||
if session.db and session.uid: |
|||
session.logout(keep_db=True) |
|||
return True |
|||
|
|||
def _auth_timeout_check(self): |
|||
if not request: |
|||
return |
|||
session = http.request.session |
|||
session_store = http.root.session_store |
|||
ConfigParam = http.request.env['ir.config_parameter'] |
|||
delay, urls = ConfigParam.get_session_parameters() |
|||
deadline = time() - delay |
|||
path = session_store.get_session_filename(session.sid) |
|||
try: |
|||
if getmtime(path) < deadline: |
|||
if session.db and session.uid: |
|||
session.logout(keep_db=True) |
|||
elif http.request.httprequest.path not in urls: |
|||
# the session is not expired, update the last modification |
|||
# and access time. |
|||
|
|||
session = request.session |
|||
|
|||
# Calculate deadline |
|||
deadline = self._auth_timeout_deadline_calculate() |
|||
|
|||
# Check if past deadline |
|||
expired = False |
|||
if deadline is not False: |
|||
path = root.session_store.get_session_filename(session.sid) |
|||
try: |
|||
expired = getmtime(path) < deadline |
|||
except OSError as e: |
|||
_logger.warning( |
|||
'Exception reading session file modified time: %s' |
|||
% e |
|||
) |
|||
pass |
|||
|
|||
# Try to terminate the session |
|||
terminated = False |
|||
if expired: |
|||
terminated = self._auth_timeout_session_terminate(session) |
|||
|
|||
# If session terminated, all done |
|||
if terminated: |
|||
return |
|||
|
|||
# Else, conditionally update session modified and access times |
|||
ignoredurls = self._auth_timeout_ignoredurls_get() |
|||
|
|||
if request.httprequest.path not in ignoredurls: |
|||
if 'path' not in locals(): |
|||
path = root.session_store.get_session_filename(session.sid) |
|||
try: |
|||
utime(path, None) |
|||
except OSError: |
|||
pass |
|||
except OSError as e: |
|||
_logger.warning( |
|||
'Exception updating session file access/modified times: %s' |
|||
% e |
|||
) |
|||
pass |
|||
|
|||
return |
|||
|
|||
@classmethod |
|||
def check(cls, db, uid, passwd): |
|||
res = super(ResUsers, cls).check(db, uid, passwd) |
|||
cls._check_session_validity(db, uid, passwd) |
|||
def _check_session_validity(self, db, uid, passwd): |
|||
"""Adaptor method for backward compatibility""" |
|||
return self._auth_timeout_check() |
|||
|
|||
def check(self, db, uid, passwd): |
|||
res = super(ResUsers, self).check(db, uid, passwd) |
|||
self._check_session_validity(db, uid, passwd) |
|||
return res |
@ -1,32 +1,71 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# (c) 2015 ACSONE SA/NV, Dhinesh D |
|||
# Copyright 2016 LasLabs Inc. |
|||
|
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
from odoo.tests.common import TransactionCase |
|||
from odoo.tests import common |
|||
|
|||
|
|||
class TestIrConfigParameter(TransactionCase): |
|||
class TestIrConfigParameter(common.TransactionCase): |
|||
|
|||
def setUp(self): |
|||
super(TestIrConfigParameter, self).setUp() |
|||
self.db = self.env.cr.dbname |
|||
self.param_obj = self.env['ir.config_parameter'] |
|||
self.data_obj = self.env['ir.model.data'] |
|||
self.delay = self.env.ref( |
|||
'auth_session_timeout.inactive_session_time_out_delay' |
|||
) |
|||
self.url = self.env.ref( |
|||
'auth_session_timeout.inactive_session_time_out_ignored_url' |
|||
) |
|||
self.urls = ['url1', 'url2'] |
|||
self.url.value = ','.join(self.urls) |
|||
|
|||
def test_get_session_parameters_delay(self): |
|||
""" It should return the proper delay """ |
|||
delay, _ = self.param_obj.get_session_parameters() |
|||
'auth_session_timeout.inactive_session_time_out_delay') |
|||
|
|||
def test_check_session_params(self): |
|||
delay, urls = self.param_obj.get_session_parameters(self.db) |
|||
self.assertEqual(delay, int(self.delay.value)) |
|||
self.assertIsInstance(delay, int) |
|||
self.assertIsInstance(urls, list) |
|||
|
|||
def test_check_session_param_delay(self): |
|||
delay = self.param_obj._auth_timeout_get_parameter_delay() |
|||
self.assertEqual(delay, int(self.delay.value)) |
|||
self.assertIsInstance(delay, int) |
|||
|
|||
def test_check_session_param_urls(self): |
|||
urls = self.param_obj._auth_timeout_get_parameter_ignoredurls() |
|||
self.assertIsInstance(urls, list) |
|||
|
|||
|
|||
class TestIrConfigParameterCaching(common.TransactionCase): |
|||
|
|||
def setUp(self): |
|||
super(TestIrConfigParameterCaching, self).setUp() |
|||
self.db = self.env.cr.dbname |
|||
self.param_obj = self.env['ir.config_parameter'] |
|||
self.get_param_called = False |
|||
test = self |
|||
|
|||
def get_param(*args, **kwargs): |
|||
test.get_param_called = True |
|||
return orig_get_param(args[3], args[4]) |
|||
orig_get_param = self.param_obj.get_param |
|||
self.param_obj._patch_method( |
|||
'get_param', |
|||
get_param) |
|||
|
|||
def tearDown(self): |
|||
super(TestIrConfigParameterCaching, self).tearDown() |
|||
self.param_obj._revert_method('get_param') |
|||
|
|||
def test_check_param_cache_working(self): |
|||
self.get_param_called = False |
|||
delay, urls = self.param_obj.get_session_parameters(self.db) |
|||
self.assertTrue(self.get_param_called) |
|||
self.get_param_called = False |
|||
delay, urls = self.param_obj.get_session_parameters(self.db) |
|||
self.assertFalse(self.get_param_called) |
|||
|
|||
def test_get_session_parameters_url(self): |
|||
""" It should return URIs split by comma """ |
|||
_, urls = self.param_obj.get_session_parameters() |
|||
self.assertEqual(urls, self.urls) |
|||
def test_check_param_writes_clear_cache(self): |
|||
self.get_param_called = False |
|||
delay, urls = self.param_obj.get_session_parameters(self.db) |
|||
self.assertTrue(self.get_param_called) |
|||
self.get_param_called = False |
|||
self.param_obj.set_param('inactive_session_time_out_delay', 7201) |
|||
delay, urls = self.param_obj.get_session_parameters(self.db) |
|||
self.assertTrue(self.get_param_called) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue