Browse Source
[REF] auth_brute_force: Cover all auth entrypoints (#1219)
[REF] auth_brute_force: Cover all auth entrypoints (#1219)
To fix https://github.com/OCA/server-tools/issues/1125 I needed to refactor the addon. To whitelist IPs now you use a config parameter, which renders res.banned.remote model unneeded. The fix is affected by https://github.com/odoo/odoo/issues/24183 and will not work until it gets fixed upstream due to the technical limitations implied.pull/1250/head
Jairo Llopis
7 years ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 838 additions and 294 deletions
-
6.travis.yml
-
53auth_brute_force/README.rst
-
3auth_brute_force/__init__.py
-
13auth_brute_force/__manifest__.py
-
3auth_brute_force/controllers/__init__.py
-
76auth_brute_force/controllers/main.py
-
15auth_brute_force/data/ir_config_parameter.xml
-
50auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py
-
2auth_brute_force/models/__init__.py
-
180auth_brute_force/models/res_authentication_attempt.py
-
45auth_brute_force/models/res_banned_remote.py
-
155auth_brute_force/models/res_users.py
-
14auth_brute_force/security/ir_model_access.yml
-
BINauth_brute_force/static/description/screenshot_custom_ban.png
-
3auth_brute_force/tests/__init__.py
-
361auth_brute_force/tests/test_brute_force.py
-
7auth_brute_force/views/action.xml
-
4auth_brute_force/views/menu.xml
-
92auth_brute_force/views/view.xml
@ -1,4 +1,3 @@ |
|||||
# -*- encoding: utf-8 -*- |
|
||||
|
# -*- coding: utf-8 -*- |
||||
|
|
||||
from . import models |
from . import models |
||||
from . import controllers |
|
@ -1,3 +0,0 @@ |
|||||
# -*- coding: utf-8 -*- |
|
||||
|
|
||||
from . import main |
|
@ -1,76 +0,0 @@ |
|||||
# -*- coding: utf-8 -*- |
|
||||
# Copyright 2015 GRAP - Sylvain LE GAL |
|
||||
# Copyright 2017 Tecnativa - David Vidal |
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|
||||
|
|
||||
import logging |
|
||||
|
|
||||
from odoo import fields, http, registry, SUPERUSER_ID |
|
||||
from odoo.api import Environment |
|
||||
from odoo.http import request |
|
||||
from odoo.addons.web.controllers.main import Home, ensure_db |
|
||||
|
|
||||
_logger = logging.getLogger(__name__) |
|
||||
|
|
||||
|
|
||||
class LoginController(Home): |
|
||||
|
|
||||
@http.route() |
|
||||
def web_login(self, redirect=None, **kw): |
|
||||
if request.httprequest.method == 'POST': |
|
||||
ensure_db() |
|
||||
remote = request.httprequest.remote_addr |
|
||||
# Get registry and cursor |
|
||||
with registry(request.session.db).cursor() as cursor: |
|
||||
env = Environment(cursor, SUPERUSER_ID, {}) |
|
||||
config_obj = env['ir.config_parameter'] |
|
||||
attempt_obj = env['res.authentication.attempt'] |
|
||||
banned_remote_obj = env['res.banned.remote'] |
|
||||
# Get Settings |
|
||||
max_attempts_qty = int(config_obj.get_param( |
|
||||
'auth_brute_force.max_attempt_qty')) |
|
||||
# Test if remote user is banned |
|
||||
banned = banned_remote_obj.search([('remote', '=', remote)]) |
|
||||
if banned: |
|
||||
request.params['password'] = '' |
|
||||
_logger.warning( |
|
||||
"Authentication tried from remote '%s'. The request " |
|
||||
"has been ignored because the remote has been banned " |
|
||||
"after %d attempts without success. Login tried : '%s'" |
|
||||
"." % (remote, max_attempts_qty, |
|
||||
request.params['login'])) |
|
||||
else: |
|
||||
# Try to authenticate |
|
||||
result = request.session.authenticate( |
|
||||
request.session.db, request.params['login'], |
|
||||
request.params['password']) |
|
||||
# Log attempt |
|
||||
attempt_obj.create({ |
|
||||
'attempt_date': fields.Datetime.now(), |
|
||||
'login': request.params['login'], |
|
||||
'remote': remote, |
|
||||
'result': banned and 'banned' or ( |
|
||||
result and 'successfull' or 'failed'), |
|
||||
}) |
|
||||
cursor.commit() |
|
||||
if not banned and not result: |
|
||||
# Get last bad attempts quantity |
|
||||
attempts_qty = len(attempt_obj.search_last_failed(remote)) |
|
||||
if max_attempts_qty <= attempts_qty: |
|
||||
# We ban the remote |
|
||||
_logger.warning( |
|
||||
"Authentication failed from remote '%s'. " |
|
||||
"The remote has been banned. Login tried : '%s'" |
|
||||
"." % (remote, request.params['login'])) |
|
||||
banned_remote_obj.sudo().create({ |
|
||||
'remote': remote, |
|
||||
'ban_date': fields.Datetime.now(), |
|
||||
}) |
|
||||
cursor.commit() |
|
||||
else: |
|
||||
_logger.warning( |
|
||||
"Authentication failed from remote '%s'." |
|
||||
" Login tried : '%s'. Attempt %d / %d." % ( |
|
||||
remote, request.params['login'], attempts_qty, |
|
||||
max_attempts_qty)) |
|
||||
return super(LoginController, self).web_login(redirect=redirect, **kw) |
|
@ -1,15 +0,0 @@ |
|||||
<?xml version="1.0" encoding="UTF-8"?> |
|
||||
<!-- Copyright 2015 GRAP -Sylvain LE GAL |
|
||||
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). --> |
|
||||
<odoo noupdate="1"> |
|
||||
|
|
||||
<data> |
|
||||
|
|
||||
<record id="max_attempt_qty" model="ir.config_parameter"> |
|
||||
<field name="key">auth_brute_force.max_attempt_qty</field> |
|
||||
<field name="value">10</field> |
|
||||
</record> |
|
||||
|
|
||||
</data> |
|
||||
|
|
||||
</odoo> |
|
@ -0,0 +1,50 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
# Copyright 2018 Tecnativa - Jairo Llopis |
||||
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
||||
|
|
||||
|
from psycopg2 import IntegrityError |
||||
|
|
||||
|
|
||||
|
def migrate(cr, version): |
||||
|
# Fix typo across DB |
||||
|
cr.execute( |
||||
|
""" UPDATE res_authentication_attempt |
||||
|
SET result = 'successful' |
||||
|
WHERE result = 'successfull'""", |
||||
|
) |
||||
|
# Store whitelist IPs in new format |
||||
|
cr.execute( |
||||
|
""" SELECT remote |
||||
|
FROM res_banned_remote |
||||
|
WHERE active IS FALSE""", |
||||
|
) |
||||
|
remotes = {record[0] for record in cr.fetchall()} |
||||
|
try: |
||||
|
with cr.savepoint(): |
||||
|
cr.execute( |
||||
|
"INSERT INTO ir_config_parameter (key, value) VALUES (%s, %s)", |
||||
|
( |
||||
|
"auth_brute_force.whitelist_remotes", |
||||
|
",".join(remotes), |
||||
|
), |
||||
|
) |
||||
|
except IntegrityError: |
||||
|
# Parameter already exists |
||||
|
cr.execute( |
||||
|
"SELECT value FROM ir_config_parameter WHERE key = %s", |
||||
|
("auth_brute_force.whitelist_remotes",) |
||||
|
) |
||||
|
current = set(cr.fetchall()[0][0].split(",")) |
||||
|
cr.execute( |
||||
|
"UPDATE ir_config_parameter SET value = %s WHERE key = %s", |
||||
|
(",".join(current | remotes), |
||||
|
"auth_brute_force.whitelist_remotes"), |
||||
|
) |
||||
|
# Update the configured IP limit parameter |
||||
|
cr.execute( |
||||
|
"UPDATE ir_config_parameter SET key = %s WHERE key = %s", |
||||
|
( |
||||
|
"auth_brute_force.whitelist_remotes", |
||||
|
"auth_brute_force.max_by_ip", |
||||
|
) |
||||
|
) |
@ -1,4 +1,4 @@ |
|||||
# -*- encoding: utf-8 -*- |
# -*- encoding: utf-8 -*- |
||||
|
|
||||
from . import res_banned_remote |
|
||||
from . import res_authentication_attempt |
from . import res_authentication_attempt |
||||
|
from . import res_users |
@ -1,45 +0,0 @@ |
|||||
# -*- coding: utf-8 -*- |
|
||||
# Copyright 2015 GRAP - Sylvain LE GAL |
|
||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). |
|
||||
|
|
||||
import urllib |
|
||||
import json |
|
||||
|
|
||||
from odoo import api, fields, models |
|
||||
|
|
||||
|
|
||||
class ResBannedRemote(models.Model): |
|
||||
_name = 'res.banned.remote' |
|
||||
_rec_name = 'remote' |
|
||||
|
|
||||
_GEOLOCALISATION_URL = "http://ip-api.com/json/{}" |
|
||||
|
|
||||
# Column Section |
|
||||
description = fields.Text( |
|
||||
string='Description', compute='_compute_description', store=True) |
|
||||
ban_date = fields.Datetime( |
|
||||
string='Ban Date', required=True, default=fields.Datetime.now) |
|
||||
remote = fields.Char(string='Remote ID', required=True) |
|
||||
active = fields.Boolean( |
|
||||
string='Active', help="Uncheck this box to unban the remote", |
|
||||
default=True) |
|
||||
attempt_ids = fields.Many2many( |
|
||||
comodel_name='res.authentication.attempt', string='Attempts', |
|
||||
compute='_compute_attempt_ids') |
|
||||
|
|
||||
# Compute Section |
|
||||
@api.multi |
|
||||
@api.depends('remote') |
|
||||
def _compute_description(self): |
|
||||
for item in self: |
|
||||
url = self._GEOLOCALISATION_URL.format(item.remote) |
|
||||
res = json.loads(urllib.urlopen(url).read()) |
|
||||
item.description = '' |
|
||||
for k, v in res.iteritems(): |
|
||||
item.description += '%s : %s\n' % (k, v) |
|
||||
|
|
||||
@api.multi |
|
||||
def _compute_attempt_ids(self): |
|
||||
for item in self: |
|
||||
attempt_obj = self.env['res.authentication.attempt'] |
|
||||
item.attempt_ids = attempt_obj.search_last_failed(item.remote) |
|
@ -0,0 +1,155 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
# Copyright 2017 Tecnativa - Jairo Llopis |
||||
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
||||
|
|
||||
|
import logging |
||||
|
from contextlib import contextmanager |
||||
|
from threading import current_thread |
||||
|
from odoo import api, models, SUPERUSER_ID |
||||
|
from odoo.exceptions import AccessDenied |
||||
|
from odoo.service import wsgi_server |
||||
|
|
||||
|
_logger = logging.getLogger(__name__) |
||||
|
|
||||
|
|
||||
|
class ResUsers(models.Model): |
||||
|
_inherit = "res.users" |
||||
|
|
||||
|
# HACK https://github.com/odoo/odoo/issues/24183 |
||||
|
# TODO Remove in v12, and use normal odoo.http.request to get details |
||||
|
@api.model_cr |
||||
|
def _register_hook(self): |
||||
|
"""🐒-patch XML-RPC controller to know remote address.""" |
||||
|
original_fn = wsgi_server.application_unproxied |
||||
|
|
||||
|
def _patch(environ, start_response): |
||||
|
current_thread().environ = environ |
||||
|
return original_fn(environ, start_response) |
||||
|
|
||||
|
wsgi_server.application_unproxied = _patch |
||||
|
|
||||
|
# Helpers to track authentication attempts |
||||
|
@classmethod |
||||
|
@contextmanager |
||||
|
def _auth_attempt(cls, login): |
||||
|
"""Start an authentication attempt and track its state.""" |
||||
|
try: |
||||
|
# Check if this call is nested |
||||
|
attempt_id = current_thread().auth_attempt_id |
||||
|
except AttributeError: |
||||
|
# Not nested; create a new attempt |
||||
|
attempt_id = cls._auth_attempt_new(login) |
||||
|
if not attempt_id: |
||||
|
# No attempt was created, so there's nothing to do here |
||||
|
yield |
||||
|
return |
||||
|
try: |
||||
|
current_thread().auth_attempt_id = attempt_id |
||||
|
result = "successful" |
||||
|
try: |
||||
|
yield |
||||
|
except AccessDenied as error: |
||||
|
result = getattr(error, "reason", "failed") |
||||
|
raise |
||||
|
finally: |
||||
|
cls._auth_attempt_update({"result": result}) |
||||
|
finally: |
||||
|
try: |
||||
|
del current_thread().auth_attempt_id |
||||
|
except AttributeError: |
||||
|
pass # It was deleted already |
||||
|
|
||||
|
@classmethod |
||||
|
def _auth_attempt_force_raise(cls, login, method): |
||||
|
"""Force a method to raise an AccessDenied on falsey return.""" |
||||
|
try: |
||||
|
with cls._auth_attempt(login): |
||||
|
result = method() |
||||
|
if not result: |
||||
|
# Force exception to record auth failure |
||||
|
raise AccessDenied() |
||||
|
except AccessDenied: |
||||
|
pass # `_auth_attempt()` did the hard part already |
||||
|
return result |
||||
|
|
||||
|
@classmethod |
||||
|
def _auth_attempt_new(cls, login): |
||||
|
"""Store one authentication attempt, not knowing the result.""" |
||||
|
# Get the right remote address |
||||
|
try: |
||||
|
remote_addr = current_thread().environ["REMOTE_ADDR"] |
||||
|
except (KeyError, AttributeError): |
||||
|
remote_addr = False |
||||
|
# Exit if it doesn't make sense to store this attempt |
||||
|
if not remote_addr: |
||||
|
return False |
||||
|
# Use a separate cursor to keep changes always |
||||
|
with cls.pool.cursor() as cr: |
||||
|
env = api.Environment(cr, SUPERUSER_ID, {}) |
||||
|
attempt = env["res.authentication.attempt"].create({ |
||||
|
"login": login, |
||||
|
"remote": remote_addr, |
||||
|
}) |
||||
|
return attempt.id |
||||
|
|
||||
|
@classmethod |
||||
|
def _auth_attempt_update(cls, values): |
||||
|
"""Update a given auth attempt if we still ignore its result.""" |
||||
|
auth_id = getattr(current_thread(), "auth_attempt_id", False) |
||||
|
if not auth_id: |
||||
|
return {} # No running auth attempt; nothing to do |
||||
|
# Use a separate cursor to keep changes always |
||||
|
with cls.pool.cursor() as cr: |
||||
|
env = api.Environment(cr, SUPERUSER_ID, {}) |
||||
|
attempt = env["res.authentication.attempt"].browse(auth_id) |
||||
|
# Update only on 1st call |
||||
|
if not attempt.result: |
||||
|
attempt.write(values) |
||||
|
return attempt.copy_data()[0] if attempt else {} |
||||
|
|
||||
|
# Override all auth-related core methods |
||||
|
@classmethod |
||||
|
def _login(cls, db, login, password): |
||||
|
return cls._auth_attempt_force_raise( |
||||
|
login, |
||||
|
lambda: super(ResUsers, cls)._login(db, login, password), |
||||
|
) |
||||
|
|
||||
|
@classmethod |
||||
|
def authenticate(cls, db, login, password, user_agent_env): |
||||
|
return cls._auth_attempt_force_raise( |
||||
|
login, |
||||
|
lambda: super(ResUsers, cls).authenticate( |
||||
|
db, login, password, user_agent_env), |
||||
|
) |
||||
|
|
||||
|
@classmethod |
||||
|
def check(cls, db, uid, passwd): |
||||
|
with cls._auth_attempt(uid): |
||||
|
return super(ResUsers, cls).check(db, uid, passwd) |
||||
|
|
||||
|
@api.model |
||||
|
def check_credentials(self, password): |
||||
|
"""This is the most important and specific auth check method. |
||||
|
|
||||
|
When we get here, it means that Odoo already checked the user exists |
||||
|
in this database. |
||||
|
|
||||
|
Other auth methods usually plug here. |
||||
|
""" |
||||
|
login = self.env.user.login |
||||
|
with self._auth_attempt(login): |
||||
|
# Update login, just in case we stored the UID before |
||||
|
attempt = self._auth_attempt_update({"login": login}) |
||||
|
remote = attempt.get("remote") |
||||
|
# Fail if the remote is banned |
||||
|
trusted = self.env["res.authentication.attempt"]._trusted( |
||||
|
remote, |
||||
|
login, |
||||
|
) |
||||
|
if not trusted: |
||||
|
error = AccessDenied() |
||||
|
error.reason = "banned" |
||||
|
raise error |
||||
|
# Continue with other auth systems |
||||
|
return super(ResUsers, self).check_credentials(password) |
Before Width: 601 | Height: 331 | Size: 31 KiB |
@ -0,0 +1,3 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
|
||||
|
from . import test_brute_force |
@ -0,0 +1,361 @@ |
|||||
|
# -*- coding: utf-8 -*- |
||||
|
# Copyright 2017 Tecnativa - Jairo Llopis |
||||
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). |
||||
|
|
||||
|
from threading import current_thread |
||||
|
from unittest import skipUnless |
||||
|
from urllib import urlencode |
||||
|
|
||||
|
from mock import patch |
||||
|
from werkzeug.utils import redirect |
||||
|
|
||||
|
from odoo import http |
||||
|
from odoo.tests.common import at_install, can_import, HttpCase, post_install |
||||
|
from odoo.tools import mute_logger |
||||
|
|
||||
|
from ..models import res_authentication_attempt, res_users |
||||
|
|
||||
|
|
||||
|
GARBAGE_LOGGERS = ( |
||||
|
"werkzeug", |
||||
|
res_authentication_attempt.__name__, |
||||
|
res_users.__name__, |
||||
|
) |
||||
|
|
||||
|
|
||||
|
@at_install(False) |
||||
|
@post_install(True) |
||||
|
# Skip CSRF validation on tests |
||||
|
@patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True) |
||||
|
# Skip specific browser forgery on redirections |
||||
|
@patch(http.__name__ + ".redirect_with_hash", side_effect=redirect) |
||||
|
# Faster tests without calls to geolocation API |
||||
|
@patch(res_authentication_attempt.__name__ + ".urlopen", return_value="") |
||||
|
class BruteForceCase(HttpCase): |
||||
|
def setUp(self): |
||||
|
super(BruteForceCase, self).setUp() |
||||
|
# Some tests could retain environ from last test and produce fake |
||||
|
# results without this patch |
||||
|
# HACK https://github.com/odoo/odoo/issues/24183 |
||||
|
# TODO Remove in v12 |
||||
|
try: |
||||
|
del current_thread().environ |
||||
|
except AttributeError: |
||||
|
pass |
||||
|
# Complex password to avoid conflicts with `password_security` |
||||
|
self.good_password = "Admin$%02584" |
||||
|
self.data_demo = { |
||||
|
"login": "demo", |
||||
|
"password": "Demo%&/(908409**", |
||||
|
} |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
env["ir.config_parameter"].set_param( |
||||
|
"auth_brute_force.max_by_ip_user", 3) |
||||
|
env["ir.config_parameter"].set_param( |
||||
|
"auth_brute_force.max_by_ip", 4) |
||||
|
# Clean attempts to be able to count in tests |
||||
|
env["res.authentication.attempt"].search([]).unlink() |
||||
|
# Make sure involved users have good passwords |
||||
|
env.user.password = self.good_password |
||||
|
env["res.users"].search([ |
||||
|
("login", "=", self.data_demo["login"]), |
||||
|
]).password = self.data_demo["password"] |
||||
|
|
||||
|
@skipUnless(can_import("odoo.addons.web"), "Needs web addon") |
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_web_login_existing(self, *args): |
||||
|
"""Remote is banned with real user on web login form.""" |
||||
|
data1 = { |
||||
|
"login": "admin", |
||||
|
"password": "1234", # Wrong |
||||
|
} |
||||
|
# Make sure user is logged out |
||||
|
self.url_open("/web/session/logout", timeout=30) |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
||||
|
# If you fail, you get /web/login again |
||||
|
self.assertTrue( |
||||
|
response.geturl().endswith("/web/login"), |
||||
|
"Unexpected URL %s" % response.geturl(), |
||||
|
) |
||||
|
# Admin banned, demo not |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
"demo", |
||||
|
), |
||||
|
) |
||||
|
# Now I know the password, but login is rejected too |
||||
|
data1["password"] = self.good_password |
||||
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
||||
|
self.assertTrue( |
||||
|
response.geturl().endswith("/web/login"), |
||||
|
"Unexpected URL %s" % response.geturl(), |
||||
|
) |
||||
|
# IP has been banned, demo user cannot login |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
"demo", |
||||
|
), |
||||
|
) |
||||
|
# Attempts recorded |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
failed = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "failed"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(failed), 3) |
||||
|
banned = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "banned"), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(banned), 1) |
||||
|
# Unban |
||||
|
banned.action_whitelist_add() |
||||
|
# Try good login, it should work now |
||||
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
||||
|
self.assertTrue(response.geturl().endswith("/web")) |
||||
|
|
||||
|
@skipUnless(can_import("odoo.addons.web"), "Needs web addon") |
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_web_login_unexisting(self, *args): |
||||
|
"""Remote is banned with fake user on web login form.""" |
||||
|
data1 = { |
||||
|
"login": "administrator", # Wrong |
||||
|
"password": self.good_password, |
||||
|
} |
||||
|
# Make sure user is logged out |
||||
|
self.url_open("/web/session/logout", timeout=30) |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30) |
||||
|
# If you fail, you get /web/login again |
||||
|
self.assertTrue( |
||||
|
response.geturl().endswith("/web/login"), |
||||
|
"Unexpected URL %s" % response.geturl(), |
||||
|
) |
||||
|
# Admin banned, demo not |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
self.data_demo["login"], |
||||
|
), |
||||
|
) |
||||
|
# Demo user can login |
||||
|
response = self.url_open( |
||||
|
"/web/login", |
||||
|
bytes(urlencode(self.data_demo)), |
||||
|
30, |
||||
|
) |
||||
|
# If you pass, you get /web |
||||
|
self.assertTrue( |
||||
|
response.geturl().endswith("/web"), |
||||
|
"Unexpected URL %s" % response.geturl(), |
||||
|
) |
||||
|
self.url_open("/web/session/logout", timeout=30) |
||||
|
# Attempts recorded |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
failed = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "failed"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(failed), 3) |
||||
|
banned = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "banned"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(banned), 0) |
||||
|
|
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_xmlrpc_login_existing(self, *args): |
||||
|
"""Remote is banned with real user on XML-RPC login.""" |
||||
|
data1 = { |
||||
|
"login": "admin", |
||||
|
"password": "1234", # Wrong |
||||
|
} |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
self.assertFalse(self.xmlrpc_common.authenticate( |
||||
|
self.env.cr.dbname, data1["login"], data1["password"], {})) |
||||
|
# Admin banned, demo not |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
"demo", |
||||
|
), |
||||
|
) |
||||
|
# Now I know the password, but login is rejected too |
||||
|
data1["password"] = self.good_password |
||||
|
self.assertFalse(self.xmlrpc_common.authenticate( |
||||
|
self.env.cr.dbname, data1["login"], data1["password"], {})) |
||||
|
# IP has been banned, demo user cannot login |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
"demo", |
||||
|
), |
||||
|
) |
||||
|
# Attempts recorded |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
failed = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "failed"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(failed), 3) |
||||
|
banned = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "banned"), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(banned), 1) |
||||
|
# Unban |
||||
|
banned.action_whitelist_add() |
||||
|
# Try good login, it should work now |
||||
|
self.assertTrue(self.xmlrpc_common.authenticate( |
||||
|
self.env.cr.dbname, data1["login"], data1["password"], {})) |
||||
|
|
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_xmlrpc_login_unexisting(self, *args): |
||||
|
"""Remote is banned with fake user on XML-RPC login.""" |
||||
|
data1 = { |
||||
|
"login": "administrator", # Wrong |
||||
|
"password": self.good_password, |
||||
|
} |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
self.assertFalse(self.xmlrpc_common.authenticate( |
||||
|
self.env.cr.dbname, data1["login"], data1["password"], {})) |
||||
|
# Admin banned, demo not |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
self.assertFalse( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
self.data_demo["login"], |
||||
|
), |
||||
|
) |
||||
|
# Demo user can login |
||||
|
self.assertTrue(self.xmlrpc_common.authenticate( |
||||
|
self.env.cr.dbname, |
||||
|
self.data_demo["login"], |
||||
|
self.data_demo["password"], |
||||
|
{}, |
||||
|
)) |
||||
|
# Attempts recorded |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
failed = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "failed"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(failed), 3) |
||||
|
banned = env["res.authentication.attempt"].search([ |
||||
|
("result", "=", "banned"), |
||||
|
("login", "=", data1["login"]), |
||||
|
("remote", "=", "127.0.0.1"), |
||||
|
]) |
||||
|
self.assertEqual(len(banned), 0) |
||||
|
|
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_orm_login_existing(self, *args): |
||||
|
"""No bans on ORM login with an existing user.""" |
||||
|
data1 = { |
||||
|
"login": "admin", |
||||
|
"password": "1234", # Wrong |
||||
|
} |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
self.assertFalse( |
||||
|
env["res.users"].authenticate( |
||||
|
cr.dbname, data1["login"], data1["password"], {})) |
||||
|
self.assertEqual( |
||||
|
env["res.authentication.attempt"].search(count=True, args=[]), |
||||
|
0, |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
# Now I know the password, and login works |
||||
|
data1["password"] = self.good_password |
||||
|
self.assertTrue( |
||||
|
env["res.users"].authenticate( |
||||
|
cr.dbname, data1["login"], data1["password"], {})) |
||||
|
|
||||
|
@mute_logger(*GARBAGE_LOGGERS) |
||||
|
def test_orm_login_unexisting(self, *args): |
||||
|
"""No bans on ORM login with an unexisting user.""" |
||||
|
data1 = { |
||||
|
"login": "administrator", # Wrong |
||||
|
"password": self.good_password, |
||||
|
} |
||||
|
with self.cursor() as cr: |
||||
|
env = self.env(cr) |
||||
|
# Fail 3 times |
||||
|
for n in range(3): |
||||
|
self.assertFalse( |
||||
|
env["res.users"].authenticate( |
||||
|
cr.dbname, data1["login"], data1["password"], {})) |
||||
|
self.assertEqual( |
||||
|
env["res.authentication.attempt"].search(count=True, args=[]), |
||||
|
0, |
||||
|
) |
||||
|
self.assertTrue( |
||||
|
env["res.authentication.attempt"]._trusted( |
||||
|
"127.0.0.1", |
||||
|
data1["login"], |
||||
|
), |
||||
|
) |
||||
|
# Now I know the user, and login works |
||||
|
data1["login"] = "admin" |
||||
|
self.assertTrue( |
||||
|
env["res.users"].authenticate( |
||||
|
cr.dbname, data1["login"], data1["password"], {})) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue