Browse Source
[REF] auth_brute_force: Cover all auth entrypoints (#1219)
[REF] auth_brute_force: Cover all auth entrypoints (#1219)
To fix https://github.com/OCA/server-tools/issues/1125 I needed to refactor the addon. To whitelist IPs now you use a config parameter, which renders res.banned.remote model unneeded. The fix is affected by https://github.com/odoo/odoo/issues/24183 and will not work until it gets fixed upstream due to the technical limitations implied.pull/1255/head
Jairo Llopis
7 years ago
committed by
Jairo Llopis
18 changed files with 834 additions and 292 deletions
-
53auth_brute_force/README.rst
-
3auth_brute_force/__init__.py
-
13auth_brute_force/__manifest__.py
-
3auth_brute_force/controllers/__init__.py
-
76auth_brute_force/controllers/main.py
-
15auth_brute_force/data/ir_config_parameter.xml
-
50auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py
-
2auth_brute_force/models/__init__.py
-
180auth_brute_force/models/res_authentication_attempt.py
-
45auth_brute_force/models/res_banned_remote.py
-
155auth_brute_force/models/res_users.py
-
14auth_brute_force/security/ir_model_access.yml
-
BINauth_brute_force/static/description/screenshot_custom_ban.png
-
3auth_brute_force/tests/__init__.py
-
361auth_brute_force/tests/test_brute_force.py
-
7auth_brute_force/views/action.xml
-
4auth_brute_force/views/menu.xml
-
92auth_brute_force/views/view.xml
@ -1,4 +1,3 @@ |
|||
# -*- encoding: utf-8 -*- |
|||
# -*- coding: utf-8 -*- |
|||
|
|||
from . import models |
|||
from . import controllers |
@ -1,3 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
|
|||
from . import main |
@ -1,76 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# Copyright 2015 GRAP - Sylvain LE GAL |
|||
# Copyright 2017 Tecnativa - David Vidal |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
import logging |
|||
|
|||
from odoo import fields, http, registry, SUPERUSER_ID |
|||
from odoo.api import Environment |
|||
from odoo.http import request |
|||
from odoo.addons.web.controllers.main import Home, ensure_db |
|||
|
|||
_logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
class LoginController(Home): |
|||
|
|||
@http.route() |
|||
def web_login(self, redirect=None, **kw): |
|||
if request.httprequest.method == 'POST': |
|||
ensure_db() |
|||
remote = request.httprequest.remote_addr |
|||
# Get registry and cursor |
|||
with registry(request.session.db).cursor() as cursor: |
|||
env = Environment(cursor, SUPERUSER_ID, {}) |
|||
config_obj = env['ir.config_parameter'] |
|||
attempt_obj = env['res.authentication.attempt'] |
|||
banned_remote_obj = env['res.banned.remote'] |
|||
# Get Settings |
|||
max_attempts_qty = int(config_obj.get_param( |
|||
'auth_brute_force.max_attempt_qty')) |
|||
# Test if remote user is banned |
|||
banned = banned_remote_obj.search([('remote', '=', remote)]) |
|||
if banned: |
|||
request.params['password'] = '' |
|||
_logger.warning( |
|||
"Authentication tried from remote '%s'. The request " |
|||
"has been ignored because the remote has been banned " |
|||
"after %d attempts without success. Login tried : '%s'" |
|||
"." % (remote, max_attempts_qty, |
|||
request.params['login'])) |
|||
else: |
|||
# Try to authenticate |
|||
result = request.session.authenticate( |
|||
request.session.db, request.params['login'], |
|||
request.params['password']) |
|||
# Log attempt |
|||
attempt_obj.create({ |
|||
'attempt_date': fields.Datetime.now(), |
|||
'login': request.params['login'], |
|||
'remote': remote, |
|||
'result': banned and 'banned' or ( |
|||
result and 'successfull' or 'failed'), |
|||
}) |
|||
cursor.commit() |
|||
if not banned and not result: |
|||
# Get last bad attempts quantity |
|||
attempts_qty = len(attempt_obj.search_last_failed(remote)) |
|||
if max_attempts_qty <= attempts_qty: |
|||
# We ban the remote |
|||
_logger.warning( |
|||
"Authentication failed from remote '%s'. " |
|||
"The remote has been banned. Login tried : '%s'" |
|||
"." % (remote, request.params['login'])) |
|||
banned_remote_obj.sudo().create({ |
|||
'remote': remote, |
|||
'ban_date': fields.Datetime.now(), |
|||
}) |
|||
cursor.commit() |
|||
else: |
|||
_logger.warning( |
|||
"Authentication failed from remote '%s'." |
|||
" Login tried : '%s'. Attempt %d / %d." % ( |
|||
remote, request.params['login'], attempts_qty, |
|||
max_attempts_qty)) |
|||
return super(LoginController, self).web_login(redirect=redirect, **kw) |
@ -1,15 +0,0 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<!-- Copyright 2015 GRAP -Sylvain LE GAL |
|||
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). --> |
|||
<odoo noupdate="1"> |
|||
|
|||
<data> |
|||
|
|||
<record id="max_attempt_qty" model="ir.config_parameter"> |
|||
<field name="key">auth_brute_force.max_attempt_qty</field> |
|||
<field name="value">10</field> |
|||
</record> |
|||
|
|||
</data> |
|||
|
|||
</odoo> |
@ -0,0 +1,50 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# Copyright 2018 Tecnativa - Jairo Llopis |
|||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
|||
|
|||
from psycopg2 import IntegrityError |
|||
|
|||
|
|||
def migrate(cr, version): |
|||
# Fix typo across DB |
|||
cr.execute( |
|||
""" UPDATE res_authentication_attempt |
|||
SET result = 'successful' |
|||
WHERE result = 'successfull'""", |
|||
) |
|||
# Store whitelist IPs in new format |
|||
cr.execute( |
|||
""" SELECT remote |
|||
FROM res_banned_remote |
|||
WHERE active IS FALSE""", |
|||
) |
|||
remotes = {record[0] for record in cr.fetchall()} |
|||
try: |
|||
with cr.savepoint(): |
|||
cr.execute( |
|||
"INSERT INTO ir_config_parameter (key, value) VALUES (%s, %s)", |
|||
( |
|||
"auth_brute_force.whitelist_remotes", |
|||
",".join(remotes), |
|||
), |
|||
) |
|||
except IntegrityError: |
|||
# Parameter already exists |
|||
cr.execute( |
|||
"SELECT value FROM ir_config_parameter WHERE key = %s", |
|||
("auth_brute_force.whitelist_remotes",) |
|||
) |
|||
current = set(cr.fetchall()[0][0].split(",")) |
|||
cr.execute( |
|||
"UPDATE ir_config_parameter SET value = %s WHERE key = %s", |
|||
(",".join(current | remotes), |
|||
"auth_brute_force.whitelist_remotes"), |
|||
) |
|||
# Update the configured IP limit parameter |
|||
cr.execute( |
|||
"UPDATE ir_config_parameter SET key = %s WHERE key = %s", |
|||
( |
|||
"auth_brute_force.whitelist_remotes", |
|||
"auth_brute_force.max_by_ip", |
|||
) |
|||
) |
@ -1,4 +1,4 @@ |
|||
# -*- encoding: utf-8 -*- |
|||
|
|||
from . import res_banned_remote |
|||
from . import res_authentication_attempt |
|||
from . import res_users |
@ -1,45 +0,0 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# Copyright 2015 GRAP - Sylvain LE GAL |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|||
|
|||
import urllib |
|||
import json |
|||
|
|||
from odoo import api, fields, models |
|||
|
|||
|
|||
class ResBannedRemote(models.Model): |
|||
_name = 'res.banned.remote' |
|||
_rec_name = 'remote' |
|||
|
|||
_GEOLOCALISATION_URL = "http://ip-api.com/json/{}" |
|||
|
|||
# Column Section |
|||
description = fields.Text( |
|||
string='Description', compute='_compute_description', store=True) |
|||
ban_date = fields.Datetime( |
|||
string='Ban Date', required=True, default=fields.Datetime.now) |
|||
remote = fields.Char(string='Remote ID', required=True) |
|||
active = fields.Boolean( |
|||
string='Active', help="Uncheck this box to unban the remote", |
|||
default=True) |
|||
attempt_ids = fields.Many2many( |
|||
comodel_name='res.authentication.attempt', string='Attempts', |
|||
compute='_compute_attempt_ids') |
|||
|
|||
# Compute Section |
|||
@api.multi |
|||
@api.depends('remote') |
|||
def _compute_description(self): |
|||
for item in self: |
|||
url = self._GEOLOCALISATION_URL.format(item.remote) |
|||
res = json.loads(urllib.urlopen(url).read()) |
|||
item.description = '' |
|||
for k, v in res.iteritems(): |
|||
item.description += '%s : %s\n' % (k, v) |
|||
|
|||
@api.multi |
|||
def _compute_attempt_ids(self): |
|||
for item in self: |
|||
attempt_obj = self.env['res.authentication.attempt'] |
|||
item.attempt_ids = attempt_obj.search_last_failed(item.remote) |
@ -0,0 +1,155 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# Copyright 2017 Tecnativa - Jairo Llopis |
|||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
|||
|
|||
import logging |
|||
from contextlib import contextmanager |
|||
from threading import current_thread |
|||
from odoo import api, models, SUPERUSER_ID |
|||
from odoo.exceptions import AccessDenied |
|||
from odoo.service import wsgi_server |
|||
|
|||
_logger = logging.getLogger(__name__) |
|||
|
|||
|
|||
class ResUsers(models.Model): |
|||
_inherit = "res.users" |
|||
|
|||
# HACK https://github.com/odoo/odoo/issues/24183 |
|||
# TODO Remove in v12, and use normal odoo.http.request to get details |
|||
@api.model_cr |
|||
def _register_hook(self): |
|||
"""🐒-patch XML-RPC controller to know remote address.""" |
|||
original_fn = wsgi_server.application_unproxied |
|||
|
|||
def _patch(environ, start_response): |
|||
current_thread().environ = environ |
|||
return original_fn(environ, start_response) |
|||
|
|||
wsgi_server.application_unproxied = _patch |
|||
|
|||
# Helpers to track authentication attempts |
|||
@classmethod |
|||
@contextmanager |
|||
def _auth_attempt(cls, login): |
|||
"""Start an authentication attempt and track its state.""" |
|||
try: |
|||
# Check if this call is nested |
|||
attempt_id = current_thread().auth_attempt_id |
|||
except AttributeError: |
|||
# Not nested; create a new attempt |
|||
attempt_id = cls._auth_attempt_new(login) |
|||
if not attempt_id: |
|||
# No attempt was created, so there's nothing to do here |
|||
yield |
|||
return |
|||
try: |
|||
current_thread().auth_attempt_id = attempt_id |
|||
result = "successful" |
|||
try: |
|||
yield |
|||
except AccessDenied as error: |
|||
result = getattr(error, "reason", "failed") |
|||
raise |
|||
finally: |
|||
cls._auth_attempt_update({"result": result}) |
|||
finally: |
|||
try: |
|||
del current_thread().auth_attempt_id |
|||
except AttributeError: |
|||
pass # It was deleted already |
|||
|
|||
@classmethod |
|||
def _auth_attempt_force_raise(cls, login, method): |
|||
"""Force a method to raise an AccessDenied on falsey return.""" |
|||
try: |
|||
with cls._auth_attempt(login): |
|||
result = method() |
|||
if not result: |
|||
# Force exception to record auth failure |
|||
raise AccessDenied() |
|||
except AccessDenied: |
|||
pass # `_auth_attempt()` did the hard part already |
|||
return result |
|||
|
|||
@classmethod |
|||
def _auth_attempt_new(cls, login): |
|||
"""Store one authentication attempt, not knowing the result.""" |
|||
# Get the right remote address |
|||
try: |
|||
remote_addr = current_thread().environ["REMOTE_ADDR"] |
|||
except (KeyError, AttributeError): |
|||
remote_addr = False |
|||
# Exit if it doesn't make sense to store this attempt |
|||
if not remote_addr: |
|||
return False |
|||
# Use a separate cursor to keep changes always |
|||
with cls.pool.cursor() as cr: |
|||
env = api.Environment(cr, SUPERUSER_ID, {}) |
|||
attempt = env["res.authentication.attempt"].create({ |
|||
"login": login, |
|||
"remote": remote_addr, |
|||
}) |
|||
return attempt.id |
|||
|
|||
@classmethod |
|||
def _auth_attempt_update(cls, values): |
|||
"""Update a given auth attempt if we still ignore its result.""" |
|||
auth_id = getattr(current_thread(), "auth_attempt_id", False) |
|||
if not auth_id: |
|||
return {} # No running auth attempt; nothing to do |
|||
# Use a separate cursor to keep changes always |
|||
with cls.pool.cursor() as cr: |
|||
env = api.Environment(cr, SUPERUSER_ID, {}) |
|||
attempt = env["res.authentication.attempt"].browse(auth_id) |
|||
# Update only on 1st call |
|||
if not attempt.result: |
|||
attempt.write(values) |
|||
return attempt.copy_data()[0] if attempt else {} |
|||
|
|||
# Override all auth-related core methods |
|||
@classmethod |
|||
def _login(cls, db, login, password): |
|||
return cls._auth_attempt_force_raise( |
|||
login, |
|||
lambda: super(ResUsers, cls)._login(db, login, password), |
|||
) |
|||
|
|||
@classmethod |
|||
def authenticate(cls, db, login, password, user_agent_env): |
|||
return cls._auth_attempt_force_raise( |
|||
login, |
|||
lambda: super(ResUsers, cls).authenticate( |
|||
db, login, password, user_agent_env), |
|||
) |
|||
|
|||
@classmethod |
|||
def check(cls, db, uid, passwd): |
|||
with cls._auth_attempt(uid): |
|||
return super(ResUsers, cls).check(db, uid, passwd) |
|||
|
|||
@api.model |
|||
def check_credentials(self, password): |
|||
"""This is the most important and specific auth check method. |
|||
|
|||
When we get here, it means that Odoo already checked the user exists |
|||
in this database. |
|||
|
|||
Other auth methods usually plug here. |
|||
""" |
|||
login = self.env.user.login |
|||
with self._auth_attempt(login): |
|||
# Update login, just in case we stored the UID before |
|||
attempt = self._auth_attempt_update({"login": login}) |
|||
remote = attempt.get("remote") |
|||
# Fail if the remote is banned |
|||
trusted = self.env["res.authentication.attempt"]._trusted( |
|||
remote, |
|||
login, |
|||
) |
|||
if not trusted: |
|||
error = AccessDenied() |
|||
error.reason = "banned" |
|||
raise error |
|||
# Continue with other auth systems |
|||
return super(ResUsers, self).check_credentials(password) |
Before Width: 601 | Height: 331 | Size: 31 KiB |
@ -0,0 +1,3 @@ |
|||
# -*- coding: utf-8 -*- |
|||
|
|||
from . import test_brute_force |
@ -0,0 +1,361 @@ |
|||
# -*- coding: utf-8 -*- |
|||
# Copyright 2017 Tecnativa - Jairo Llopis |
|||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
|||
|
|||
from threading import current_thread |
|||
from unittest import skipUnless |
|||
from urllib import urlencode |
|||
|
|||
from mock import patch |
|||
from werkzeug.utils import redirect |
|||
|
|||
from odoo import http |
|||
from odoo.tests.common import at_install, can_import, HttpCase, post_install |
|||
from odoo.tools import mute_logger |
|||
|
|||
from ..models import res_authentication_attempt, res_users |
|||
|
|||
|
|||
GARBAGE_LOGGERS = ( |
|||
"werkzeug", |
|||
res_authentication_attempt.__name__, |
|||
res_users.__name__, |
|||
) |
|||
|
|||
|
|||
@at_install(False) |
|||
@post_install(True) |
|||
# Skip CSRF validation on tests |
|||
@patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True) |
|||
# Skip specific browser forgery on redirections |
|||
@patch(http.__name__ + ".redirect_with_hash", side_effect=redirect) |
|||
# Faster tests without calls to geolocation API |
|||
@patch(res_authentication_attempt.__name__ + ".urlopen", return_value="") |
|||
class BruteForceCase(HttpCase): |
|||
def setUp(self): |
|||
super(BruteForceCase, self).setUp() |
|||
# Some tests could retain environ from last test and produce fake |
|||
# results without this patch |
|||
# HACK https://github.com/odoo/odoo/issues/24183 |
|||
# TODO Remove in v12 |
|||
try: |
|||
del current_thread().environ |
|||
except AttributeError: |
|||
pass |
|||
# Complex password to avoid conflicts with `password_security` |
|||
self.good_password = "Admin$%02584" |
|||
self.data_demo = { |
|||
"login": "demo", |
|||
"password": "Demo%&/(908409**", |
|||
} |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
env["ir.config_parameter"].set_param( |
|||
"auth_brute_force.max_by_ip_user", 3) |
|||
env["ir.config_parameter"].set_param( |
|||
"auth_brute_force.max_by_ip", 4) |
|||
# Clean attempts to be able to count in tests |
|||
env["res.authentication.attempt"].search([]).unlink() |
|||
# Make sure involved users have good passwords |
|||
env.user.password = self.good_password |
|||
env["res.users"].search([ |
|||
("login", "=", self.data_demo["login"]), |
|||
]).password = self.data_demo["password"] |
|||
|
|||
@skipUnless(can_import("odoo.addons.web"), "Needs web addon") |
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_web_login_existing(self, *args): |
|||
"""Remote is banned with real user on web login form.""" |
|||
data1 = { |
|||
"login": "admin", |
|||
"password": "1234", # Wrong |
|||
} |
|||
# Make sure user is logged out |
|||
self.url_open("/web/session/logout", timeout=30) |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
|||
# If you fail, you get /web/login again |
|||
self.assertTrue( |
|||
response.geturl().endswith("/web/login"), |
|||
"Unexpected URL %s" % response.geturl(), |
|||
) |
|||
# Admin banned, demo not |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
"demo", |
|||
), |
|||
) |
|||
# Now I know the password, but login is rejected too |
|||
data1["password"] = self.good_password |
|||
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
|||
self.assertTrue( |
|||
response.geturl().endswith("/web/login"), |
|||
"Unexpected URL %s" % response.geturl(), |
|||
) |
|||
# IP has been banned, demo user cannot login |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
"demo", |
|||
), |
|||
) |
|||
# Attempts recorded |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
failed = env["res.authentication.attempt"].search([ |
|||
("result", "=", "failed"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(failed), 3) |
|||
banned = env["res.authentication.attempt"].search([ |
|||
("result", "=", "banned"), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(banned), 1) |
|||
# Unban |
|||
banned.action_whitelist_add() |
|||
# Try good login, it should work now |
|||
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
|||
self.assertTrue(response.geturl().endswith("/web")) |
|||
|
|||
@skipUnless(can_import("odoo.addons.web"), "Needs web addon") |
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_web_login_unexisting(self, *args): |
|||
"""Remote is banned with fake user on web login form.""" |
|||
data1 = { |
|||
"login": "administrator", # Wrong |
|||
"password": self.good_password, |
|||
} |
|||
# Make sure user is logged out |
|||
self.url_open("/web/session/logout", timeout=30) |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
|||
# If you fail, you get /web/login again |
|||
self.assertTrue( |
|||
response.geturl().endswith("/web/login"), |
|||
"Unexpected URL %s" % response.geturl(), |
|||
) |
|||
# Admin banned, demo not |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
self.data_demo["login"], |
|||
), |
|||
) |
|||
# Demo user can login |
|||
response = self.url_open( |
|||
"/web/login", |
|||
bytes(urlencode(self.data_demo)), |
|||
30, |
|||
) |
|||
# If you pass, you get /web |
|||
self.assertTrue( |
|||
response.geturl().endswith("/web"), |
|||
"Unexpected URL %s" % response.geturl(), |
|||
) |
|||
self.url_open("/web/session/logout", timeout=30) |
|||
# Attempts recorded |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
failed = env["res.authentication.attempt"].search([ |
|||
("result", "=", "failed"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(failed), 3) |
|||
banned = env["res.authentication.attempt"].search([ |
|||
("result", "=", "banned"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(banned), 0) |
|||
|
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_xmlrpc_login_existing(self, *args): |
|||
"""Remote is banned with real user on XML-RPC login.""" |
|||
data1 = { |
|||
"login": "admin", |
|||
"password": "1234", # Wrong |
|||
} |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
self.assertFalse(self.xmlrpc_common.authenticate( |
|||
self.env.cr.dbname, data1["login"], data1["password"], {})) |
|||
# Admin banned, demo not |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
"demo", |
|||
), |
|||
) |
|||
# Now I know the password, but login is rejected too |
|||
data1["password"] = self.good_password |
|||
self.assertFalse(self.xmlrpc_common.authenticate( |
|||
self.env.cr.dbname, data1["login"], data1["password"], {})) |
|||
# IP has been banned, demo user cannot login |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
"demo", |
|||
), |
|||
) |
|||
# Attempts recorded |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
failed = env["res.authentication.attempt"].search([ |
|||
("result", "=", "failed"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(failed), 3) |
|||
banned = env["res.authentication.attempt"].search([ |
|||
("result", "=", "banned"), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(banned), 1) |
|||
# Unban |
|||
banned.action_whitelist_add() |
|||
# Try good login, it should work now |
|||
self.assertTrue(self.xmlrpc_common.authenticate( |
|||
self.env.cr.dbname, data1["login"], data1["password"], {})) |
|||
|
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_xmlrpc_login_unexisting(self, *args): |
|||
"""Remote is banned with fake user on XML-RPC login.""" |
|||
data1 = { |
|||
"login": "administrator", # Wrong |
|||
"password": self.good_password, |
|||
} |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
self.assertFalse(self.xmlrpc_common.authenticate( |
|||
self.env.cr.dbname, data1["login"], data1["password"], {})) |
|||
# Admin banned, demo not |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
self.assertFalse( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
self.data_demo["login"], |
|||
), |
|||
) |
|||
# Demo user can login |
|||
self.assertTrue(self.xmlrpc_common.authenticate( |
|||
self.env.cr.dbname, |
|||
self.data_demo["login"], |
|||
self.data_demo["password"], |
|||
{}, |
|||
)) |
|||
# Attempts recorded |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
failed = env["res.authentication.attempt"].search([ |
|||
("result", "=", "failed"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(failed), 3) |
|||
banned = env["res.authentication.attempt"].search([ |
|||
("result", "=", "banned"), |
|||
("login", "=", data1["login"]), |
|||
("remote", "=", "127.0.0.1"), |
|||
]) |
|||
self.assertEqual(len(banned), 0) |
|||
|
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_orm_login_existing(self, *args): |
|||
"""No bans on ORM login with an existing user.""" |
|||
data1 = { |
|||
"login": "admin", |
|||
"password": "1234", # Wrong |
|||
} |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
self.assertFalse( |
|||
env["res.users"].authenticate( |
|||
cr.dbname, data1["login"], data1["password"], {})) |
|||
self.assertEqual( |
|||
env["res.authentication.attempt"].search(count=True, args=[]), |
|||
0, |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
# Now I know the password, and login works |
|||
data1["password"] = self.good_password |
|||
self.assertTrue( |
|||
env["res.users"].authenticate( |
|||
cr.dbname, data1["login"], data1["password"], {})) |
|||
|
|||
@mute_logger(*GARBAGE_LOGGERS) |
|||
def test_orm_login_unexisting(self, *args): |
|||
"""No bans on ORM login with an unexisting user.""" |
|||
data1 = { |
|||
"login": "administrator", # Wrong |
|||
"password": self.good_password, |
|||
} |
|||
with self.cursor() as cr: |
|||
env = self.env(cr) |
|||
# Fail 3 times |
|||
for n in range(3): |
|||
self.assertFalse( |
|||
env["res.users"].authenticate( |
|||
cr.dbname, data1["login"], data1["password"], {})) |
|||
self.assertEqual( |
|||
env["res.authentication.attempt"].search(count=True, args=[]), |
|||
0, |
|||
) |
|||
self.assertTrue( |
|||
env["res.authentication.attempt"]._trusted( |
|||
"127.0.0.1", |
|||
data1["login"], |
|||
), |
|||
) |
|||
# Now I know the user, and login works |
|||
data1["login"] = "admin" |
|||
self.assertTrue( |
|||
env["res.users"].authenticate( |
|||
cr.dbname, data1["login"], data1["password"], {})) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue