Browse Source

[MIG] auth_brute_force: Backport from v10

Odoo v9 auth methods usually violate both old and new api; they just only work in old api, so I had to adapt some tests to use it instead.

Normal backport outside of that.
pull/1255/head
Jairo Llopis 7 years ago
parent
commit
883a21fc11
  1. 2
      auth_brute_force/README.rst
  2. 2
      auth_brute_force/__openerp__.py
  3. 50
      auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py
  4. 2
      auth_brute_force/models/res_authentication_attempt.py
  5. 30
      auth_brute_force/models/res_users.py
  6. 17
      auth_brute_force/tests/test_brute_force.py

2
auth_brute_force/README.rst

@ -62,7 +62,7 @@ Screenshot
.. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas
:alt: Try me on Runbot
:target: https://runbot.odoo-community.org/runbot/149/10.0
:target: https://runbot.odoo-community.org/runbot/149/9.0
For further information, please visit:

2
auth_brute_force/__manifest__.py → auth_brute_force/__openerp__.py

@ -4,7 +4,7 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
{
'name': 'Authentification - Brute-Force Filter',
'version': '10.0.2.1.1',
'version': '9.0.1.0.0',
'category': 'Tools',
'summary': "Track Authentication Attempts and Prevent Brute-force Attacks",
'author': "GRAP, "

50
auth_brute_force/migrations/10.0.2.0.0/pre-migrate.py

@ -1,50 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2018 Tecnativa - Jairo Llopis
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
from psycopg2 import IntegrityError
def migrate(cr, version):
# Fix typo across DB
cr.execute(
""" UPDATE res_authentication_attempt
SET result = 'successful'
WHERE result = 'successfull'""",
)
# Store whitelist IPs in new format
cr.execute(
""" SELECT remote
FROM res_banned_remote
WHERE active IS FALSE""",
)
remotes = {record[0] for record in cr.fetchall()}
try:
with cr.savepoint():
cr.execute(
"INSERT INTO ir_config_parameter (key, value) VALUES (%s, %s)",
(
"auth_brute_force.whitelist_remotes",
",".join(remotes),
),
)
except IntegrityError:
# Parameter already exists
cr.execute(
"SELECT value FROM ir_config_parameter WHERE key = %s",
("auth_brute_force.whitelist_remotes",)
)
current = set(cr.fetchall()[0][0].split(","))
cr.execute(
"UPDATE ir_config_parameter SET value = %s WHERE key = %s",
(",".join(current | remotes),
"auth_brute_force.whitelist_remotes"),
)
# Update the configured IP limit parameter
cr.execute(
"UPDATE ir_config_parameter SET key = %s WHERE key = %s",
(
"auth_brute_force.whitelist_remotes",
"auth_brute_force.max_by_ip",
)
)

2
auth_brute_force/models/res_authentication_attempt.py

@ -5,7 +5,7 @@
import json
import logging
from urllib2 import urlopen
from odoo import api, fields, models
from openerp import api, fields, models
GEOLOCALISATION_URL = u"http://ip-api.com/json/{}"

30
auth_brute_force/models/res_users.py

@ -5,9 +5,9 @@
import logging
from contextlib import contextmanager
from threading import current_thread
from odoo import api, models, SUPERUSER_ID
from odoo.exceptions import AccessDenied
from odoo.service import wsgi_server
from openerp import api, models, SUPERUSER_ID
from openerp.exceptions import AccessDenied
from openerp.service import wsgi_server
_logger = logging.getLogger(__name__)
@ -17,8 +17,7 @@ class ResUsers(models.Model):
# HACK https://github.com/odoo/odoo/issues/24183
# TODO Remove in v12, and use normal odoo.http.request to get details
@api.model_cr
def _register_hook(self):
def _register_hook(self, cr):
"""🐒-patch XML-RPC controller to know remote address."""
original_fn = wsgi_server.application_unproxied
@ -108,25 +107,22 @@ class ResUsers(models.Model):
return attempt.copy_data()[0] if attempt else {}
# Override all auth-related core methods
@classmethod
def _login(cls, db, login, password):
return cls._auth_attempt_force_raise(
def _login(self, db, login, password):
return self._auth_attempt_force_raise(
login,
lambda: super(ResUsers, cls)._login(db, login, password),
lambda: super(ResUsers, self)._login(db, login, password),
)
@classmethod
def authenticate(cls, db, login, password, user_agent_env):
return cls._auth_attempt_force_raise(
def authenticate(self, db, login, password, user_agent_env):
return self._auth_attempt_force_raise(
login,
lambda: super(ResUsers, cls).authenticate(
lambda: super(ResUsers, self).authenticate(
db, login, password, user_agent_env),
)
@classmethod
def check(cls, db, uid, passwd):
with cls._auth_attempt(uid):
return super(ResUsers, cls).check(db, uid, passwd)
def check(self, db, uid, passwd):
with self._auth_attempt(uid):
return super(ResUsers, self).check(db, uid, passwd)
@api.model
def check_credentials(self, password):

17
auth_brute_force/tests/test_brute_force.py

@ -9,9 +9,10 @@ from decorator import decorator
from mock import patch
from werkzeug.utils import redirect
from odoo import http
from odoo.tests.common import at_install, HttpCase, post_install
from odoo.tools import mute_logger
from openerp import http
from openerp.tests.common import at_install, HttpCase, post_install
from openerp.tools import mute_logger
from openerp.modules.registry import RegistryManager
from ..models import res_authentication_attempt, res_users
@ -341,10 +342,11 @@ class BruteForceCase(HttpCase):
}
with self.cursor() as cr:
env = self.env(cr)
pool = RegistryManager.get(cr.dbname)
# Fail 3 times
for n in range(3):
self.assertFalse(
env["res.users"].authenticate(
pool["res.users"].authenticate(
cr.dbname, data1["login"], data1["password"], {}))
self.assertEqual(
env["res.authentication.attempt"].search(count=True, args=[]),
@ -359,7 +361,7 @@ class BruteForceCase(HttpCase):
# Now I know the password, and login works
data1["password"] = self.good_password
self.assertTrue(
env["res.users"].authenticate(
pool["res.users"].authenticate(
cr.dbname, data1["login"], data1["password"], {}))
@mute_logger(*GARBAGE_LOGGERS)
@ -370,11 +372,12 @@ class BruteForceCase(HttpCase):
"password": self.good_password,
}
with self.cursor() as cr:
pool = RegistryManager.get(cr.dbname)
env = self.env(cr)
# Fail 3 times
for n in range(3):
self.assertFalse(
env["res.users"].authenticate(
pool["res.users"].authenticate(
cr.dbname, data1["login"], data1["password"], {}))
self.assertEqual(
env["res.authentication.attempt"].search(count=True, args=[]),
@ -389,5 +392,5 @@ class BruteForceCase(HttpCase):
# Now I know the user, and login works
data1["login"] = "admin"
self.assertTrue(
env["res.users"].authenticate(
pool["res.users"].authenticate(
cr.dbname, data1["login"], data1["password"], {}))
Loading…
Cancel
Save