diff --git a/auth_brute_force/README.rst b/auth_brute_force/README.rst index 2ad67486a..35dd52da0 100644 --- a/auth_brute_force/README.rst +++ b/auth_brute_force/README.rst @@ -22,9 +22,18 @@ extra information about remote IP. Configuration ============= -Once installed, you can change the ir.config_parameter value for the key -'auth_brute_force.max_attempt_qty' (10 by default) that define the max number -of attempts allowed before the user was banned. +You can use these configuration parameters that control this addon behavior: + +* ``auth_brute_force.whitelist_remotes`` is a comma-separated list of + whitelisted IPs. Failures from these remotes are ignored. + +* ``auth_brute_force.max_by_ip`` defaults to 50, and indicates the maximum + successive failures allowed for an IP. After hitting the limit, the IP gets + banned. + +* ``auth_brute_force.max_by_ip_user`` defaults to 10, and indicates the + maximum successive failures allowed for any IP and user combination. + After hitting the limit, that user and IP combination is banned. Usage ===== @@ -34,17 +43,14 @@ Admin user have the possibility to unblock a banned IP. Logging ------- -This module generates some WARNING logs, in the three following cases: +This module generates some WARNING logs, in the following cases: -* Authentication failed from remote '127.0.0.1'. Login tried : 'admin'. - Attempt 1 / 10. +* When the IP limit is reached: *Authentication failed from remote 'x.x.x.x'. + The remote has been banned. Login tried: xxxx.* -* Authentication failed from remote '127.0.0.1'. The remote has been banned. - Login tried : 'admin'. - -* Authentication tried from remote '127.0.0.1'. The request has been ignored - because the remote has been banned after 10 attempts without success. Login - tried : 'admin'. +* When the IP+user combination limit is reached: + *Authentication failed from remote 'x.x.x.x'. + The remote and login combination has been banned. Login tried: xxxx.* Screenshot ---------- @@ -53,13 +59,9 @@ Screenshot .. image:: /auth_brute_force/static/description/screenshot_attempts_list.png -**Detail of a banned IP** - -.. image:: /auth_brute_force/static/description/screenshot_custom_ban.png - .. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas -:alt: Try me on Runbot + :alt: Try me on Runbot :target: https://runbot.odoo-community.org/runbot/149/10.0 For further information, please visit: @@ -69,14 +71,18 @@ For further information, please visit: Known issues / Roadmap ====================== -* The ID used to identify a remote request is the IP provided in the request - (key 'REMOTE_ADDR'). +* Remove 🐒 patch for https://github.com/odoo/odoo/issues/24183 in v12. + * Depending of server and / or user network configuration, the idenfication of the user can be wrong, and mainly in the following cases: -* If the Odoo server is behind an Apache / NGinx proxy without redirection, - all the request will be have the value '127.0.0.1' for the REMOTE_ADDR key; -* If some users are behind the same Internet Service Provider, if a user is - banned, all the other users will be banned too; + + * If the Odoo server is behind an Apache / NGinx proxy and it is not properly + configured, all requests will use the same IP address. Blocking such IP + could render Odoo unusable for all users! **Make sure your logs output the + correct IP for werkzeug traffic before installing this addon.** + +* The IP metadata retrieval should use a better system. `See details here + `_. Bug Tracker =========== @@ -94,6 +100,7 @@ Contributors * Sylvain LE GAL (https://twitter.com/legalsylvain) * David Vidal +* Jairo Llopis Maintainer ---------- diff --git a/auth_brute_force/__init__.py b/auth_brute_force/__init__.py index 1f9880145..cde864bae 100644 --- a/auth_brute_force/__init__.py +++ b/auth_brute_force/__init__.py @@ -1,4 +1,3 @@ -# -*- encoding: utf-8 -*- +# -*- coding: utf-8 -*- from . import models -from . import controllers diff --git a/auth_brute_force/__manifest__.py b/auth_brute_force/__manifest__.py index 1af88faed..628385fe2 100644 --- a/auth_brute_force/__manifest__.py +++ b/auth_brute_force/__manifest__.py @@ -3,22 +3,21 @@ # Copyright 2017 Tecnativa - David Vidal # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). { - 'name': 'Authentification - Brute-force Attack', - 'version': '10.0.1.0.0', + 'name': 'Authentification - Brute-Force Filter', + 'version': '10.0.2.0.0', 'category': 'Tools', - 'summary': "Tracks Authentication Attempts and Prevents Brute-force" - " Attacks module", + 'summary': "Track Authentication Attempts and Prevent Brute-force Attacks", 'author': "GRAP, " "Tecnativa, " "Odoo Community Association (OCA)", - 'website': 'http://www.grap.coop', + 'website': 'https://github.com/OCA/server-tools', 'license': 'AGPL-3', 'depends': [ - 'web', - ], + # If we don't depend on it, it would inhibit this addon + "auth_crypt", + ], 'data': [ 'security/ir_model_access.yml', - 'data/ir_config_parameter.xml', 'views/view.xml', 'views/action.xml', 'views/menu.xml', diff --git a/auth_brute_force/controllers/__init__.py b/auth_brute_force/controllers/__init__.py deleted file mode 100644 index 65a8c1201..000000000 --- a/auth_brute_force/controllers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -*- coding: utf-8 -*- - -from . import main diff --git a/auth_brute_force/controllers/main.py b/auth_brute_force/controllers/main.py deleted file mode 100644 index 222a62bdb..000000000 --- a/auth_brute_force/controllers/main.py +++ /dev/null @@ -1,76 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 GRAP - Sylvain LE GAL -# Copyright 2017 Tecnativa - David Vidal -# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). - -import logging - -from odoo import fields, http, registry, SUPERUSER_ID -from odoo.api import Environment -from odoo.http import request -from odoo.addons.web.controllers.main import Home, ensure_db - -_logger = logging.getLogger(__name__) - - -class LoginController(Home): - - @http.route() - def web_login(self, redirect=None, **kw): - if request.httprequest.method == 'POST': - ensure_db() - remote = request.httprequest.remote_addr - # Get registry and cursor - with registry(request.session.db).cursor() as cursor: - env = Environment(cursor, SUPERUSER_ID, {}) - config_obj = env['ir.config_parameter'] - attempt_obj = env['res.authentication.attempt'] - banned_remote_obj = env['res.banned.remote'] - # Get Settings - max_attempts_qty = int(config_obj.get_param( - 'auth_brute_force.max_attempt_qty')) - # Test if remote user is banned - banned = banned_remote_obj.search([('remote', '=', remote)]) - if banned: - request.params['password'] = '' - _logger.warning( - "Authentication tried from remote '%s'. The request " - "has been ignored because the remote has been banned " - "after %d attempts without success. Login tried : '%s'" - "." % (remote, max_attempts_qty, - request.params['login'])) - else: - # Try to authenticate - result = request.session.authenticate( - request.session.db, request.params['login'], - request.params['password']) - # Log attempt - attempt_obj.create({ - 'attempt_date': fields.Datetime.now(), - 'login': request.params['login'], - 'remote': remote, - 'result': banned and 'banned' or ( - result and 'successfull' or 'failed'), - }) - cursor.commit() - if not banned and not result: - # Get last bad attempts quantity - attempts_qty = len(attempt_obj.search_last_failed(remote)) - if max_attempts_qty <= attempts_qty: - # We ban the remote - _logger.warning( - "Authentication failed from remote '%s'. " - "The remote has been banned. Login tried : '%s'" - "." % (remote, request.params['login'])) - banned_remote_obj.sudo().create({ - 'remote': remote, - 'ban_date': fields.Datetime.now(), - }) - cursor.commit() - else: - _logger.warning( - "Authentication failed from remote '%s'." - " Login tried : '%s'. Attempt %d / %d." % ( - remote, request.params['login'], attempts_qty, - max_attempts_qty)) - return super(LoginController, self).web_login(redirect=redirect, **kw) diff --git a/auth_brute_force/data/ir_config_parameter.xml b/auth_brute_force/data/ir_config_parameter.xml deleted file mode 100644 index 4fe744f32..000000000 --- a/auth_brute_force/data/ir_config_parameter.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - - auth_brute_force.max_attempt_qty - 10 - - - - - diff --git a/auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py b/auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py new file mode 100644 index 000000000..fc497cc60 --- /dev/null +++ b/auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 Tecnativa - Jairo Llopis +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from psycopg2 import IntegrityError + + +def migrate(cr, version): + # Fix typo across DB + cr.execute( + """ UPDATE res_authentication_attempt + SET result = 'successful' + WHERE result = 'successfull'""", + ) + # Store whitelist IPs in new format + cr.execute( + """ SELECT remote + FROM res_banned_remote + WHERE active IS FALSE""", + ) + remotes = {record[0] for record in cr.fetchall()} + try: + with cr.savepoint(): + cr.execute( + "INSERT INTO ir_config_parameter (key, value) VALUES (%s, %s)", + ( + "auth_brute_force.whitelist_remotes", + ",".join(remotes), + ), + ) + except IntegrityError: + # Parameter already exists + cr.execute( + "SELECT value FROM ir_config_parameter WHERE key = %s", + ("auth_brute_force.whitelist_remotes",) + ) + current = set(cr.fetchall()[0][0].split(",")) + cr.execute( + "UPDATE ir_config_parameter SET value = %s WHERE key = %s", + (",".join(current | remotes), + "auth_brute_force.whitelist_remotes"), + ) + # Update the configured IP limit parameter + cr.execute( + "UPDATE ir_config_parameter SET key = %s WHERE key = %s", + ( + "auth_brute_force.whitelist_remotes", + "auth_brute_force.max_by_ip", + ) + ) diff --git a/auth_brute_force/models/__init__.py b/auth_brute_force/models/__init__.py index f5bb7766e..75bc8dd03 100644 --- a/auth_brute_force/models/__init__.py +++ b/auth_brute_force/models/__init__.py @@ -1,4 +1,4 @@ # -*- encoding: utf-8 -*- -from . import res_banned_remote from . import res_authentication_attempt +from . import res_users diff --git a/auth_brute_force/models/res_authentication_attempt.py b/auth_brute_force/models/res_authentication_attempt.py index a75542816..ca5060f0e 100644 --- a/auth_brute_force/models/res_authentication_attempt.py +++ b/auth_brute_force/models/res_authentication_attempt.py @@ -2,35 +2,171 @@ # Copyright 2015 GRAP - Sylvain LE GAL # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). -from odoo import _, api, fields, models +import json +import logging +from urllib2 import urlopen +from odoo import api, fields, models + +GEOLOCALISATION_URL = u"http://ip-api.com/json/{}" + +_logger = logging.getLogger(__name__) class ResAuthenticationAttempt(models.Model): _name = 'res.authentication.attempt' - _order = 'attempt_date desc' - - _ATTEMPT_RESULT = [ - ('successfull', _('Successfull')), - ('failed', _('Failed')), - ('banned', _('Banned')), - ] - - # Column Section - attempt_date = fields.Datetime(string='Attempt Date') - login = fields.Char(string='Tried Login') - remote = fields.Char(string='Remote ID') + _order = 'create_date desc' + + login = fields.Char(string='Tried Login', index=True) + remote = fields.Char(string='Remote IP', index=True) result = fields.Selection( - selection=_ATTEMPT_RESULT, string='Authentication Result') + string='Authentication Result', + selection=[ + ('successful', 'Successful'), + ('failed', 'Failed'), + ('banned', 'Banned'), + ], + index=True, + ) + remote_metadata = fields.Text( + string="Remote IP metadata", + compute='_compute_metadata', + store=True, + help="Metadata publicly available for remote IP", + ) + whitelisted = fields.Boolean( + compute="_compute_whitelisted", + ) + + @api.multi + @api.depends('remote') + def _compute_metadata(self): + for item in self: + url = GEOLOCALISATION_URL.format(item.remote) + try: + res = json.loads(urlopen(url, timeout=5).read()) + except Exception: + _logger.warning( + "Couldn't fetch details from %s", + url, + exc_info=True, + ) + else: + item.remote_metadata = "\n".join( + '%s: %s' % pair for pair in res.items()) + + @api.multi + def _compute_whitelisted(self): + whitelist = self._whitelist_remotes() + for one in self: + one.whitelisted = one.remote in whitelist - # Custom Section @api.model - def search_last_failed(self, remote): + def _hits_limit(self, limit, remote, login=None): + """Know if a given remote hits a given limit. + + :param int limit: + The maximum amount of failures allowed. + + :param str remote: + The remote IP to search for. + + :param str login: + If you want to check the IP+login combination limit, supply the + login. + """ + domain = [ + ("remote", "=", remote), + ] + if login is not None: + domain += [("login", "=", login)] + # Find last successful login last_ok = self.search( - [('result', '=', 'successfull'), ('remote', '=', remote)], - order='attempt_date desc', limit=1) + domain + [("result", "=", "successful")], + order='create_date desc', + limit=1, + ) if last_ok: - return self.search([ - ('remote', '=', remote), - ('attempt_date', '>', last_ok.attempt_date)]) - else: - return self.search([('remote', '=', remote)]) + domain += [("create_date", ">", last_ok.create_date)] + # Count failures since last success, if any + recent_failures = self.search_count( + domain + [("result", "!=", "successful")], + ) + # Did we hit the limit? + return recent_failures >= limit + + @api.model + def _trusted(self, remote, login): + """Checks if any the remote or remote+login are trusted. + + :param str remote: + Remote IP from which the login attempt is taking place. + + :param str login: + User login that is being tried. + + :return bool: + ``True`` means it is trusted. ``False`` means that it is banned. + """ + # Cannot ban without remote + if not remote: + return True + get_param = self.env["ir.config_parameter"].sudo().get_param + # Whitelisted remotes always pass + if remote in self._whitelist_remotes(): + return True + # Check if remote is banned + limit = int(get_param("auth_brute_force.max_by_ip", 50)) + if self._hits_limit(limit, remote): + _logger.warning( + "Authentication failed from remote '%s'. " + "The remote has been banned. " + "Login tried: %r.", + remote, + login, + ) + return False + # Check if remote + login combination is banned + limit = int(get_param("auth_brute_force.max_by_ip_user", 10)) + if self._hits_limit(limit, remote, login): + _logger.warning( + "Authentication failed from remote '%s'. " + "The remote and login combination has been banned. " + "Login tried: %r.", + remote, + login, + ) + return False + # If you get here, you are a good boy (for now) + return True + + def _whitelist_remotes(self): + """Get whitelisted remotes. + + :return set: + Remote IPs that are whitelisted currently. + """ + whitelist = self.env["ir.config_parameter"].sudo().get_param( + "auth_brute_force.whitelist_remotes", + "", + ) + return set(whitelist.split(",")) + + @api.multi + def action_whitelist_add(self): + """Add current remotes to whitelist.""" + whitelist = self._whitelist_remotes() + whitelist |= set(self.mapped("remote")) + self.env["ir.config_parameter"].set_param( + "auth_brute_force.whitelist_remotes", + ",".join(whitelist), + ) + + @api.multi + def action_whitelist_remove(self): + """Remove current remotes from whitelist.""" + whitelist = self._whitelist_remotes() + whitelist -= set(self.mapped("remote")) + self.env["ir.config_parameter"].set_param( + "auth_brute_force.whitelist_remotes", + ",".join(whitelist), + ) diff --git a/auth_brute_force/models/res_banned_remote.py b/auth_brute_force/models/res_banned_remote.py deleted file mode 100644 index a10caad47..000000000 --- a/auth_brute_force/models/res_banned_remote.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015 GRAP - Sylvain LE GAL -# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). - -import urllib -import json - -from odoo import api, fields, models - - -class ResBannedRemote(models.Model): - _name = 'res.banned.remote' - _rec_name = 'remote' - - _GEOLOCALISATION_URL = "http://ip-api.com/json/{}" - - # Column Section - description = fields.Text( - string='Description', compute='_compute_description', store=True) - ban_date = fields.Datetime( - string='Ban Date', required=True, default=fields.Datetime.now) - remote = fields.Char(string='Remote ID', required=True) - active = fields.Boolean( - string='Active', help="Uncheck this box to unban the remote", - default=True) - attempt_ids = fields.Many2many( - comodel_name='res.authentication.attempt', string='Attempts', - compute='_compute_attempt_ids') - - # Compute Section - @api.multi - @api.depends('remote') - def _compute_description(self): - for item in self: - url = self._GEOLOCALISATION_URL.format(item.remote) - res = json.loads(urllib.urlopen(url).read()) - item.description = '' - for k, v in res.iteritems(): - item.description += '%s : %s\n' % (k, v) - - @api.multi - def _compute_attempt_ids(self): - for item in self: - attempt_obj = self.env['res.authentication.attempt'] - item.attempt_ids = attempt_obj.search_last_failed(item.remote) diff --git a/auth_brute_force/models/res_users.py b/auth_brute_force/models/res_users.py new file mode 100644 index 000000000..bf9ba88f9 --- /dev/null +++ b/auth_brute_force/models/res_users.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Tecnativa - Jairo Llopis +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import logging +from contextlib import contextmanager +from threading import current_thread +from odoo import api, models, SUPERUSER_ID +from odoo.exceptions import AccessDenied +from odoo.service import wsgi_server + +_logger = logging.getLogger(__name__) + + +class ResUsers(models.Model): + _inherit = "res.users" + + # HACK https://github.com/odoo/odoo/issues/24183 + # TODO Remove in v12, and use normal odoo.http.request to get details + @api.model_cr + def _register_hook(self): + """🐒-patch XML-RPC controller to know remote address.""" + original_fn = wsgi_server.application_unproxied + + def _patch(environ, start_response): + current_thread().environ = environ + return original_fn(environ, start_response) + + wsgi_server.application_unproxied = _patch + + # Helpers to track authentication attempts + @classmethod + @contextmanager + def _auth_attempt(cls, login): + """Start an authentication attempt and track its state.""" + try: + # Check if this call is nested + attempt_id = current_thread().auth_attempt_id + except AttributeError: + # Not nested; create a new attempt + attempt_id = cls._auth_attempt_new(login) + if not attempt_id: + # No attempt was created, so there's nothing to do here + yield + return + try: + current_thread().auth_attempt_id = attempt_id + result = "successful" + try: + yield + except AccessDenied as error: + result = getattr(error, "reason", "failed") + raise + finally: + cls._auth_attempt_update({"result": result}) + finally: + try: + del current_thread().auth_attempt_id + except AttributeError: + pass # It was deleted already + + @classmethod + def _auth_attempt_force_raise(cls, login, method): + """Force a method to raise an AccessDenied on falsey return.""" + try: + with cls._auth_attempt(login): + result = method() + if not result: + # Force exception to record auth failure + raise AccessDenied() + except AccessDenied: + pass # `_auth_attempt()` did the hard part already + return result + + @classmethod + def _auth_attempt_new(cls, login): + """Store one authentication attempt, not knowing the result.""" + # Get the right remote address + try: + remote_addr = current_thread().environ["REMOTE_ADDR"] + except (KeyError, AttributeError): + remote_addr = False + # Exit if it doesn't make sense to store this attempt + if not remote_addr: + return False + # Use a separate cursor to keep changes always + with cls.pool.cursor() as cr: + env = api.Environment(cr, SUPERUSER_ID, {}) + attempt = env["res.authentication.attempt"].create({ + "login": login, + "remote": remote_addr, + }) + return attempt.id + + @classmethod + def _auth_attempt_update(cls, values): + """Update a given auth attempt if we still ignore its result.""" + auth_id = getattr(current_thread(), "auth_attempt_id", False) + if not auth_id: + return {} # No running auth attempt; nothing to do + # Use a separate cursor to keep changes always + with cls.pool.cursor() as cr: + env = api.Environment(cr, SUPERUSER_ID, {}) + attempt = env["res.authentication.attempt"].browse(auth_id) + # Update only on 1st call + if not attempt.result: + attempt.write(values) + return attempt.copy_data()[0] if attempt else {} + + # Override all auth-related core methods + @classmethod + def _login(cls, db, login, password): + return cls._auth_attempt_force_raise( + login, + lambda: super(ResUsers, cls)._login(db, login, password), + ) + + @classmethod + def authenticate(cls, db, login, password, user_agent_env): + return cls._auth_attempt_force_raise( + login, + lambda: super(ResUsers, cls).authenticate( + db, login, password, user_agent_env), + ) + + @classmethod + def check(cls, db, uid, passwd): + with cls._auth_attempt(uid): + return super(ResUsers, cls).check(db, uid, passwd) + + @api.model + def check_credentials(self, password): + """This is the most important and specific auth check method. + + When we get here, it means that Odoo already checked the user exists + in this database. + + Other auth methods usually plug here. + """ + login = self.env.user.login + with self._auth_attempt(login): + # Update login, just in case we stored the UID before + attempt = self._auth_attempt_update({"login": login}) + remote = attempt.get("remote") + # Fail if the remote is banned + trusted = self.env["res.authentication.attempt"]._trusted( + remote, + login, + ) + if not trusted: + error = AccessDenied() + error.reason = "banned" + raise error + # Continue with other auth systems + return super(ResUsers, self).check_credentials(password) diff --git a/auth_brute_force/security/ir_model_access.yml b/auth_brute_force/security/ir_model_access.yml index 57919b774..5dcb53021 100644 --- a/auth_brute_force/security/ir_model_access.yml +++ b/auth_brute_force/security/ir_model_access.yml @@ -4,11 +4,6 @@ name: Authentication Attempt All Users perm_read: true -- !record {model: ir.model.access, id: access_res_banned_remote_all}: - model_id: model_res_banned_remote - name: Banned Remote All Users - perm_read: true - - !record {model: ir.model.access, id: access_res_authentication_attempt_manager}: group_id: base.group_system model_id: model_res_authentication_attempt @@ -17,12 +12,3 @@ perm_read: true perm_write: true perm_unlink: true - -- !record {model: ir.model.access, id: access_res_banned_remote_manager}: - group_id: base.group_system - model_id: model_res_banned_remote - name: Banned Remote Manager - perm_create: true - perm_read: true - perm_write: true - perm_unlink: true diff --git a/auth_brute_force/static/description/screenshot_custom_ban.png b/auth_brute_force/static/description/screenshot_custom_ban.png deleted file mode 100644 index 8607f6402..000000000 Binary files a/auth_brute_force/static/description/screenshot_custom_ban.png and /dev/null differ diff --git a/auth_brute_force/tests/__init__.py b/auth_brute_force/tests/__init__.py new file mode 100644 index 000000000..5757d0102 --- /dev/null +++ b/auth_brute_force/tests/__init__.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- + +from . import test_brute_force diff --git a/auth_brute_force/tests/test_brute_force.py b/auth_brute_force/tests/test_brute_force.py new file mode 100644 index 000000000..d4a3fb712 --- /dev/null +++ b/auth_brute_force/tests/test_brute_force.py @@ -0,0 +1,361 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Tecnativa - Jairo Llopis +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from threading import current_thread +from unittest import skipUnless +from urllib import urlencode + +from mock import patch +from werkzeug.utils import redirect + +from odoo import http +from odoo.tests.common import at_install, can_import, HttpCase, post_install +from odoo.tools import mute_logger + +from ..models import res_authentication_attempt, res_users + + +GARBAGE_LOGGERS = ( + "werkzeug", + res_authentication_attempt.__name__, + res_users.__name__, +) + + +@at_install(False) +@post_install(True) +# Skip CSRF validation on tests +@patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True) +# Skip specific browser forgery on redirections +@patch(http.__name__ + ".redirect_with_hash", side_effect=redirect) +# Faster tests without calls to geolocation API +@patch(res_authentication_attempt.__name__ + ".urlopen", return_value="") +class BruteForceCase(HttpCase): + def setUp(self): + super(BruteForceCase, self).setUp() + # Some tests could retain environ from last test and produce fake + # results without this patch + # HACK https://github.com/odoo/odoo/issues/24183 + # TODO Remove in v12 + try: + del current_thread().environ + except AttributeError: + pass + # Complex password to avoid conflicts with `password_security` + self.good_password = "Admin$%02584" + self.data_demo = { + "login": "demo", + "password": "Demo%&/(908409**", + } + with self.cursor() as cr: + env = self.env(cr) + env["ir.config_parameter"].set_param( + "auth_brute_force.max_by_ip_user", 3) + env["ir.config_parameter"].set_param( + "auth_brute_force.max_by_ip", 4) + # Clean attempts to be able to count in tests + env["res.authentication.attempt"].search([]).unlink() + # Make sure involved users have good passwords + env.user.password = self.good_password + env["res.users"].search([ + ("login", "=", self.data_demo["login"]), + ]).password = self.data_demo["password"] + + @skipUnless(can_import("odoo.addons.web"), "Needs web addon") + @mute_logger(*GARBAGE_LOGGERS) + def test_web_login_existing(self, *args): + """Remote is banned with real user on web login form.""" + data1 = { + "login": "admin", + "password": "1234", # Wrong + } + # Make sure user is logged out + self.url_open("/web/session/logout", timeout=30) + # Fail 3 times + for n in range(3): + response = self.url_open("/web/login", bytes(urlencode(data1)), 30) + # If you fail, you get /web/login again + self.assertTrue( + response.geturl().endswith("/web/login"), + "Unexpected URL %s" % response.geturl(), + ) + # Admin banned, demo not + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + "demo", + ), + ) + # Now I know the password, but login is rejected too + data1["password"] = self.good_password + response = self.url_open("/web/login", bytes(urlencode(data1)), 30) + self.assertTrue( + response.geturl().endswith("/web/login"), + "Unexpected URL %s" % response.geturl(), + ) + # IP has been banned, demo user cannot login + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + "demo", + ), + ) + # Attempts recorded + with self.cursor() as cr: + env = self.env(cr) + failed = env["res.authentication.attempt"].search([ + ("result", "=", "failed"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(failed), 3) + banned = env["res.authentication.attempt"].search([ + ("result", "=", "banned"), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(banned), 1) + # Unban + banned.action_whitelist_add() + # Try good login, it should work now + response = self.url_open("/web/login", bytes(urlencode(data1)), 30) + self.assertTrue(response.geturl().endswith("/web")) + + @skipUnless(can_import("odoo.addons.web"), "Needs web addon") + @mute_logger(*GARBAGE_LOGGERS) + def test_web_login_unexisting(self, *args): + """Remote is banned with fake user on web login form.""" + data1 = { + "login": "administrator", # Wrong + "password": self.good_password, + } + # Make sure user is logged out + self.url_open("/web/session/logout", timeout=30) + # Fail 3 times + for n in range(3): + response = self.url_open("/web/login", bytes(urlencode(data1)), 30) + # If you fail, you get /web/login again + self.assertTrue( + response.geturl().endswith("/web/login"), + "Unexpected URL %s" % response.geturl(), + ) + # Admin banned, demo not + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + self.data_demo["login"], + ), + ) + # Demo user can login + response = self.url_open( + "/web/login", + bytes(urlencode(self.data_demo)), + 30, + ) + # If you pass, you get /web + self.assertTrue( + response.geturl().endswith("/web"), + "Unexpected URL %s" % response.geturl(), + ) + self.url_open("/web/session/logout", timeout=30) + # Attempts recorded + with self.cursor() as cr: + env = self.env(cr) + failed = env["res.authentication.attempt"].search([ + ("result", "=", "failed"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(failed), 3) + banned = env["res.authentication.attempt"].search([ + ("result", "=", "banned"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(banned), 0) + + @mute_logger(*GARBAGE_LOGGERS) + def test_xmlrpc_login_existing(self, *args): + """Remote is banned with real user on XML-RPC login.""" + data1 = { + "login": "admin", + "password": "1234", # Wrong + } + # Fail 3 times + for n in range(3): + self.assertFalse(self.xmlrpc_common.authenticate( + self.env.cr.dbname, data1["login"], data1["password"], {})) + # Admin banned, demo not + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + "demo", + ), + ) + # Now I know the password, but login is rejected too + data1["password"] = self.good_password + self.assertFalse(self.xmlrpc_common.authenticate( + self.env.cr.dbname, data1["login"], data1["password"], {})) + # IP has been banned, demo user cannot login + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + "demo", + ), + ) + # Attempts recorded + with self.cursor() as cr: + env = self.env(cr) + failed = env["res.authentication.attempt"].search([ + ("result", "=", "failed"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(failed), 3) + banned = env["res.authentication.attempt"].search([ + ("result", "=", "banned"), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(banned), 1) + # Unban + banned.action_whitelist_add() + # Try good login, it should work now + self.assertTrue(self.xmlrpc_common.authenticate( + self.env.cr.dbname, data1["login"], data1["password"], {})) + + @mute_logger(*GARBAGE_LOGGERS) + def test_xmlrpc_login_unexisting(self, *args): + """Remote is banned with fake user on XML-RPC login.""" + data1 = { + "login": "administrator", # Wrong + "password": self.good_password, + } + # Fail 3 times + for n in range(3): + self.assertFalse(self.xmlrpc_common.authenticate( + self.env.cr.dbname, data1["login"], data1["password"], {})) + # Admin banned, demo not + with self.cursor() as cr: + env = self.env(cr) + self.assertFalse( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + self.data_demo["login"], + ), + ) + # Demo user can login + self.assertTrue(self.xmlrpc_common.authenticate( + self.env.cr.dbname, + self.data_demo["login"], + self.data_demo["password"], + {}, + )) + # Attempts recorded + with self.cursor() as cr: + env = self.env(cr) + failed = env["res.authentication.attempt"].search([ + ("result", "=", "failed"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(failed), 3) + banned = env["res.authentication.attempt"].search([ + ("result", "=", "banned"), + ("login", "=", data1["login"]), + ("remote", "=", "127.0.0.1"), + ]) + self.assertEqual(len(banned), 0) + + @mute_logger(*GARBAGE_LOGGERS) + def test_orm_login_existing(self, *args): + """No bans on ORM login with an existing user.""" + data1 = { + "login": "admin", + "password": "1234", # Wrong + } + with self.cursor() as cr: + env = self.env(cr) + # Fail 3 times + for n in range(3): + self.assertFalse( + env["res.users"].authenticate( + cr.dbname, data1["login"], data1["password"], {})) + self.assertEqual( + env["res.authentication.attempt"].search(count=True, args=[]), + 0, + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + # Now I know the password, and login works + data1["password"] = self.good_password + self.assertTrue( + env["res.users"].authenticate( + cr.dbname, data1["login"], data1["password"], {})) + + @mute_logger(*GARBAGE_LOGGERS) + def test_orm_login_unexisting(self, *args): + """No bans on ORM login with an unexisting user.""" + data1 = { + "login": "administrator", # Wrong + "password": self.good_password, + } + with self.cursor() as cr: + env = self.env(cr) + # Fail 3 times + for n in range(3): + self.assertFalse( + env["res.users"].authenticate( + cr.dbname, data1["login"], data1["password"], {})) + self.assertEqual( + env["res.authentication.attempt"].search(count=True, args=[]), + 0, + ) + self.assertTrue( + env["res.authentication.attempt"]._trusted( + "127.0.0.1", + data1["login"], + ), + ) + # Now I know the user, and login works + data1["login"] = "admin" + self.assertTrue( + env["res.users"].authenticate( + cr.dbname, data1["login"], data1["password"], {})) diff --git a/auth_brute_force/views/action.xml b/auth_brute_force/views/action.xml index ea7ac4862..de0684c99 100644 --- a/auth_brute_force/views/action.xml +++ b/auth_brute_force/views/action.xml @@ -11,11 +11,4 @@ {"search_default_filter_no_success":1} - - Banned Remotes - res.banned.remote - form - tree,form - - diff --git a/auth_brute_force/views/menu.xml b/auth_brute_force/views/menu.xml index cd246ae33..107d69d39 100644 --- a/auth_brute_force/views/menu.xml +++ b/auth_brute_force/views/menu.xml @@ -3,12 +3,8 @@ License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). --> - - - diff --git a/auth_brute_force/views/view.xml b/auth_brute_force/views/view.xml index 4865978d7..bd534f3f2 100644 --- a/auth_brute_force/views/view.xml +++ b/auth_brute_force/views/view.xml @@ -3,78 +3,74 @@ License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). --> - - - res.authentication.attempt - - - - - - - - - + + + res.authentication.attempt + + + + + + + + + - - res.authentication.attempt - - - - - - - + + res.authentication.attempt + +
+
+
+ + + + + + + + + +
+
+
- - res.authentication.attempt - - - - - - - - - - + + res.authentication.attempt + + + + + + + - - - res.banned.remote - - - - - - - - - - - res.banned.remote - -
- - - - - - - - - -
-
-
- - - res.banned.remote - - - - - - + + res.authentication.attempt + + + + + + + + + +