# -*- coding: utf-8 -*- # Copyright 2017 Tecnativa - Jairo Llopis # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). import logging from contextlib import contextmanager from threading import current_thread from odoo import api, models, SUPERUSER_ID from odoo.exceptions import AccessDenied from odoo.service import wsgi_server _logger = logging.getLogger(__name__) class ResUsers(models.Model): _inherit = "res.users" # HACK https://github.com/odoo/odoo/issues/24183 # TODO Remove in v12, and use normal odoo.http.request to get details @api.model_cr def _register_hook(self): """🐒-patch XML-RPC controller to know remote address.""" original_fn = wsgi_server.application_unproxied def _patch(environ, start_response): current_thread().environ = environ return original_fn(environ, start_response) wsgi_server.application_unproxied = _patch # Helpers to track authentication attempts @classmethod @contextmanager def _auth_attempt(cls, login): """Start an authentication attempt and track its state.""" try: # Check if this call is nested attempt_id = current_thread().auth_attempt_id except AttributeError: # Not nested; create a new attempt attempt_id = cls._auth_attempt_new(login) if not attempt_id: # No attempt was created, so there's nothing to do here yield return try: current_thread().auth_attempt_id = attempt_id result = "successful" try: yield except AccessDenied as error: result = getattr(error, "reason", "failed") raise finally: cls._auth_attempt_update({"result": result}) finally: try: del current_thread().auth_attempt_id except AttributeError: pass # It was deleted already @classmethod def _auth_attempt_force_raise(cls, login, method): """Force a method to raise an AccessDenied on falsey return.""" try: with cls._auth_attempt(login): result = method() if not result: # Force exception to record auth failure raise AccessDenied() except AccessDenied: pass # `_auth_attempt()` did the hard part already return result @classmethod def _auth_attempt_new(cls, login): """Store one authentication attempt, not knowing the result.""" # Get the right remote address try: remote_addr = current_thread().environ["REMOTE_ADDR"] except (KeyError, AttributeError): remote_addr = False # Exit if it doesn't make sense to store this attempt if not remote_addr: return False # Use a separate cursor to keep changes always with cls.pool.cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) attempt = env["res.authentication.attempt"].create({ "login": login, "remote": remote_addr, }) return attempt.id @classmethod def _auth_attempt_update(cls, values): """Update a given auth attempt if we still ignore its result.""" auth_id = getattr(current_thread(), "auth_attempt_id", False) if not auth_id: return {} # No running auth attempt; nothing to do # Use a separate cursor to keep changes always with cls.pool.cursor() as cr: env = api.Environment(cr, SUPERUSER_ID, {}) attempt = env["res.authentication.attempt"].browse(auth_id) # Update only on 1st call if not attempt.result: attempt.write(values) return attempt.copy_data()[0] if attempt else {} # Override all auth-related core methods @classmethod def _login(cls, db, login, password): return cls._auth_attempt_force_raise( login, lambda: super(ResUsers, cls)._login(db, login, password), ) @classmethod def authenticate(cls, db, login, password, user_agent_env): return cls._auth_attempt_force_raise( login, lambda: super(ResUsers, cls).authenticate( db, login, password, user_agent_env), ) @api.model def check_credentials(self, password): """This is the most important and specific auth check method. When we get here, it means that Odoo already checked the user exists in this database. Other auth methods usually plug here. """ login = self.env.user.login with self._auth_attempt(login): # Update login, just in case we stored the UID before attempt = self._auth_attempt_update({"login": login}) remote = attempt.get("remote") # Fail if the remote is banned trusted = self.env["res.authentication.attempt"]._trusted( remote, login, ) if not trusted: error = AccessDenied() error.reason = "banned" raise error # Continue with other auth systems return super(ResUsers, self).check_credentials(password)