You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
396 lines
14 KiB
396 lines
14 KiB
# -*- coding: utf-8 -*-
|
|
# Copyright 2017 Tecnativa - Jairo Llopis
|
|
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
|
|
|
|
from threading import current_thread
|
|
from urllib import urlencode
|
|
|
|
from decorator import decorator
|
|
from mock import patch
|
|
from werkzeug.utils import redirect
|
|
|
|
from openerp import http
|
|
from openerp.tests.common import at_install, HttpCase, post_install
|
|
from openerp.tools import mute_logger
|
|
from openerp.modules.registry import RegistryManager
|
|
|
|
from ..models import res_authentication_attempt, res_users
|
|
|
|
|
|
GARBAGE_LOGGERS = (
|
|
"werkzeug",
|
|
res_authentication_attempt.__name__,
|
|
res_users.__name__,
|
|
)
|
|
|
|
|
|
# HACK https://github.com/odoo/odoo/pull/24833
|
|
def skip_unless_addons_installed(*addons):
|
|
"""Decorator to skip a test unless some addons are installed.
|
|
|
|
:param *str addons:
|
|
Addon names that should be installed.
|
|
|
|
:param reason:
|
|
Explain why you must skip this test.
|
|
"""
|
|
|
|
@decorator
|
|
def _wrapper(method, self, *args, **kwargs):
|
|
installed = self.addons_installed(*addons)
|
|
if not installed:
|
|
missing = set(addons) - installed
|
|
self.skipTest("Required addons not installed: %s" %
|
|
",".join(sorted(missing)))
|
|
return method(self, *args, **kwargs)
|
|
|
|
return _wrapper
|
|
|
|
|
|
@at_install(False)
|
|
@post_install(True)
|
|
# Skip CSRF validation on tests
|
|
@patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True)
|
|
# Skip specific browser forgery on redirections
|
|
@patch(http.__name__ + ".redirect_with_hash", side_effect=redirect)
|
|
# Faster tests without calls to geolocation API
|
|
@patch(res_authentication_attempt.__name__ + ".urlopen", return_value="")
|
|
class BruteForceCase(HttpCase):
|
|
def setUp(self):
|
|
super(BruteForceCase, self).setUp()
|
|
# Some tests could retain environ from last test and produce fake
|
|
# results without this patch
|
|
# HACK https://github.com/odoo/odoo/issues/24183
|
|
# TODO Remove in v12
|
|
try:
|
|
del current_thread().environ
|
|
except AttributeError:
|
|
pass
|
|
# Complex password to avoid conflicts with `password_security`
|
|
self.good_password = "Admin$%02584"
|
|
self.data_demo = {
|
|
"login": "demo",
|
|
"password": "Demo%&/(908409**",
|
|
}
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
env["ir.config_parameter"].set_param(
|
|
"auth_brute_force.max_by_ip_user", 3)
|
|
env["ir.config_parameter"].set_param(
|
|
"auth_brute_force.max_by_ip", 4)
|
|
# Clean attempts to be able to count in tests
|
|
env["res.authentication.attempt"].search([]).unlink()
|
|
# Make sure involved users have good passwords
|
|
env.user.password = self.good_password
|
|
env["res.users"].search([
|
|
("login", "=", self.data_demo["login"]),
|
|
]).password = self.data_demo["password"]
|
|
|
|
# HACK https://github.com/odoo/odoo/pull/24833
|
|
def addons_installed(self, *addons):
|
|
"""Know if the specified addons are installed."""
|
|
found = self.env["ir.module.module"].search([
|
|
("name", "in", addons),
|
|
("state", "not in", ["uninstalled", "uninstallable"]),
|
|
])
|
|
return set(addons) - set(found.mapped("name"))
|
|
|
|
@skip_unless_addons_installed("web")
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_web_login_existing(self, *args):
|
|
"""Remote is banned with real user on web login form."""
|
|
data1 = {
|
|
"login": "admin",
|
|
"password": "1234", # Wrong
|
|
}
|
|
# Make sure user is logged out
|
|
self.url_open("/web/session/logout", timeout=30)
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
|
|
# If you fail, you get /web/login again
|
|
self.assertTrue(
|
|
response.geturl().endswith("/web/login"),
|
|
"Unexpected URL %s" % response.geturl(),
|
|
)
|
|
# Admin banned, demo not
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
"demo",
|
|
),
|
|
)
|
|
# Now I know the password, but login is rejected too
|
|
data1["password"] = self.good_password
|
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
|
|
self.assertTrue(
|
|
response.geturl().endswith("/web/login"),
|
|
"Unexpected URL %s" % response.geturl(),
|
|
)
|
|
# IP has been banned, demo user cannot login
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
"demo",
|
|
),
|
|
)
|
|
# Attempts recorded
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
failed = env["res.authentication.attempt"].search([
|
|
("result", "=", "failed"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(failed), 3)
|
|
banned = env["res.authentication.attempt"].search([
|
|
("result", "=", "banned"),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(banned), 1)
|
|
# Unban
|
|
banned.action_whitelist_add()
|
|
# Try good login, it should work now
|
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
|
|
self.assertTrue(response.geturl().endswith("/web"))
|
|
|
|
@skip_unless_addons_installed("web")
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_web_login_unexisting(self, *args):
|
|
"""Remote is banned with fake user on web login form."""
|
|
data1 = {
|
|
"login": "administrator", # Wrong
|
|
"password": self.good_password,
|
|
}
|
|
# Make sure user is logged out
|
|
self.url_open("/web/session/logout", timeout=30)
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
|
|
# If you fail, you get /web/login again
|
|
self.assertTrue(
|
|
response.geturl().endswith("/web/login"),
|
|
"Unexpected URL %s" % response.geturl(),
|
|
)
|
|
# Admin banned, demo not
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
self.data_demo["login"],
|
|
),
|
|
)
|
|
# Demo user can login
|
|
response = self.url_open(
|
|
"/web/login",
|
|
bytes(urlencode(self.data_demo)),
|
|
30,
|
|
)
|
|
# If you pass, you get /web
|
|
self.assertTrue(
|
|
response.geturl().endswith("/web"),
|
|
"Unexpected URL %s" % response.geturl(),
|
|
)
|
|
self.url_open("/web/session/logout", timeout=30)
|
|
# Attempts recorded
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
failed = env["res.authentication.attempt"].search([
|
|
("result", "=", "failed"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(failed), 3)
|
|
banned = env["res.authentication.attempt"].search([
|
|
("result", "=", "banned"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(banned), 0)
|
|
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_xmlrpc_login_existing(self, *args):
|
|
"""Remote is banned with real user on XML-RPC login."""
|
|
data1 = {
|
|
"login": "admin",
|
|
"password": "1234", # Wrong
|
|
}
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
self.assertFalse(self.xmlrpc_common.authenticate(
|
|
self.env.cr.dbname, data1["login"], data1["password"], {}))
|
|
# Admin banned, demo not
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
"demo",
|
|
),
|
|
)
|
|
# Now I know the password, but login is rejected too
|
|
data1["password"] = self.good_password
|
|
self.assertFalse(self.xmlrpc_common.authenticate(
|
|
self.env.cr.dbname, data1["login"], data1["password"], {}))
|
|
# IP has been banned, demo user cannot login
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
"demo",
|
|
),
|
|
)
|
|
# Attempts recorded
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
failed = env["res.authentication.attempt"].search([
|
|
("result", "=", "failed"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(failed), 3)
|
|
banned = env["res.authentication.attempt"].search([
|
|
("result", "=", "banned"),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(banned), 1)
|
|
# Unban
|
|
banned.action_whitelist_add()
|
|
# Try good login, it should work now
|
|
self.assertTrue(self.xmlrpc_common.authenticate(
|
|
self.env.cr.dbname, data1["login"], data1["password"], {}))
|
|
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_xmlrpc_login_unexisting(self, *args):
|
|
"""Remote is banned with fake user on XML-RPC login."""
|
|
data1 = {
|
|
"login": "administrator", # Wrong
|
|
"password": self.good_password,
|
|
}
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
self.assertFalse(self.xmlrpc_common.authenticate(
|
|
self.env.cr.dbname, data1["login"], data1["password"], {}))
|
|
# Admin banned, demo not
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
self.assertFalse(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
self.data_demo["login"],
|
|
),
|
|
)
|
|
# Demo user can login
|
|
self.assertTrue(self.xmlrpc_common.authenticate(
|
|
self.env.cr.dbname,
|
|
self.data_demo["login"],
|
|
self.data_demo["password"],
|
|
{},
|
|
))
|
|
# Attempts recorded
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
failed = env["res.authentication.attempt"].search([
|
|
("result", "=", "failed"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(failed), 3)
|
|
banned = env["res.authentication.attempt"].search([
|
|
("result", "=", "banned"),
|
|
("login", "=", data1["login"]),
|
|
("remote", "=", "127.0.0.1"),
|
|
])
|
|
self.assertEqual(len(banned), 0)
|
|
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_orm_login_existing(self, *args):
|
|
"""No bans on ORM login with an existing user."""
|
|
data1 = {
|
|
"login": "admin",
|
|
"password": "1234", # Wrong
|
|
}
|
|
with self.cursor() as cr:
|
|
env = self.env(cr)
|
|
pool = RegistryManager.get(cr.dbname)
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
self.assertFalse(
|
|
pool["res.users"].authenticate(
|
|
cr.dbname, data1["login"], data1["password"], {}))
|
|
self.assertEqual(
|
|
env["res.authentication.attempt"].search(count=True, args=[]),
|
|
0,
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
# Now I know the password, and login works
|
|
data1["password"] = self.good_password
|
|
self.assertTrue(
|
|
pool["res.users"].authenticate(
|
|
cr.dbname, data1["login"], data1["password"], {}))
|
|
|
|
@mute_logger(*GARBAGE_LOGGERS)
|
|
def test_orm_login_unexisting(self, *args):
|
|
"""No bans on ORM login with an unexisting user."""
|
|
data1 = {
|
|
"login": "administrator", # Wrong
|
|
"password": self.good_password,
|
|
}
|
|
with self.cursor() as cr:
|
|
pool = RegistryManager.get(cr.dbname)
|
|
env = self.env(cr)
|
|
# Fail 3 times
|
|
for n in range(3):
|
|
self.assertFalse(
|
|
pool["res.users"].authenticate(
|
|
cr.dbname, data1["login"], data1["password"], {}))
|
|
self.assertEqual(
|
|
env["res.authentication.attempt"].search(count=True, args=[]),
|
|
0,
|
|
)
|
|
self.assertTrue(
|
|
env["res.authentication.attempt"]._trusted(
|
|
"127.0.0.1",
|
|
data1["login"],
|
|
),
|
|
)
|
|
# Now I know the user, and login works
|
|
data1["login"] = "admin"
|
|
self.assertTrue(
|
|
pool["res.users"].authenticate(
|
|
cr.dbname, data1["login"], data1["password"], {}))
|