You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

396 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Tecnativa - Jairo Llopis
  3. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
  4. from threading import current_thread
  5. from urllib import urlencode
  6. from decorator import decorator
  7. from mock import patch
  8. from werkzeug.utils import redirect
  9. from openerp import http
  10. from openerp.tests.common import at_install, HttpCase, post_install
  11. from openerp.tools import mute_logger
  12. from openerp.modules.registry import RegistryManager
  13. from ..models import res_authentication_attempt, res_users
  14. GARBAGE_LOGGERS = (
  15. "werkzeug",
  16. res_authentication_attempt.__name__,
  17. res_users.__name__,
  18. )
  19. # HACK https://github.com/odoo/odoo/pull/24833
  20. def skip_unless_addons_installed(*addons):
  21. """Decorator to skip a test unless some addons are installed.
  22. :param *str addons:
  23. Addon names that should be installed.
  24. :param reason:
  25. Explain why you must skip this test.
  26. """
  27. @decorator
  28. def _wrapper(method, self, *args, **kwargs):
  29. installed = self.addons_installed(*addons)
  30. if not installed:
  31. missing = set(addons) - installed
  32. self.skipTest("Required addons not installed: %s" %
  33. ",".join(sorted(missing)))
  34. return method(self, *args, **kwargs)
  35. return _wrapper
  36. @at_install(False)
  37. @post_install(True)
  38. # Skip CSRF validation on tests
  39. @patch(http.__name__ + ".WebRequest.validate_csrf", return_value=True)
  40. # Skip specific browser forgery on redirections
  41. @patch(http.__name__ + ".redirect_with_hash", side_effect=redirect)
  42. # Faster tests without calls to geolocation API
  43. @patch(res_authentication_attempt.__name__ + ".urlopen", return_value="")
  44. class BruteForceCase(HttpCase):
  45. def setUp(self):
  46. super(BruteForceCase, self).setUp()
  47. # Some tests could retain environ from last test and produce fake
  48. # results without this patch
  49. # HACK https://github.com/odoo/odoo/issues/24183
  50. # TODO Remove in v12
  51. try:
  52. del current_thread().environ
  53. except AttributeError:
  54. pass
  55. # Complex password to avoid conflicts with `password_security`
  56. self.good_password = "Admin$%02584"
  57. self.data_demo = {
  58. "login": "demo",
  59. "password": "Demo%&/(908409**",
  60. }
  61. with self.cursor() as cr:
  62. env = self.env(cr)
  63. env["ir.config_parameter"].set_param(
  64. "auth_brute_force.max_by_ip_user", 3)
  65. env["ir.config_parameter"].set_param(
  66. "auth_brute_force.max_by_ip", 4)
  67. # Clean attempts to be able to count in tests
  68. env["res.authentication.attempt"].search([]).unlink()
  69. # Make sure involved users have good passwords
  70. env.user.password = self.good_password
  71. env["res.users"].search([
  72. ("login", "=", self.data_demo["login"]),
  73. ]).password = self.data_demo["password"]
  74. # HACK https://github.com/odoo/odoo/pull/24833
  75. def addons_installed(self, *addons):
  76. """Know if the specified addons are installed."""
  77. found = self.env["ir.module.module"].search([
  78. ("name", "in", addons),
  79. ("state", "not in", ["uninstalled", "uninstallable"]),
  80. ])
  81. return set(addons) - set(found.mapped("name"))
  82. @skip_unless_addons_installed("web")
  83. @mute_logger(*GARBAGE_LOGGERS)
  84. def test_web_login_existing(self, *args):
  85. """Remote is banned with real user on web login form."""
  86. data1 = {
  87. "login": "admin",
  88. "password": "1234", # Wrong
  89. }
  90. # Make sure user is logged out
  91. self.url_open("/web/session/logout", timeout=30)
  92. # Fail 3 times
  93. for n in range(3):
  94. response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
  95. # If you fail, you get /web/login again
  96. self.assertTrue(
  97. response.geturl().endswith("/web/login"),
  98. "Unexpected URL %s" % response.geturl(),
  99. )
  100. # Admin banned, demo not
  101. with self.cursor() as cr:
  102. env = self.env(cr)
  103. self.assertFalse(
  104. env["res.authentication.attempt"]._trusted(
  105. "127.0.0.1",
  106. data1["login"],
  107. ),
  108. )
  109. self.assertTrue(
  110. env["res.authentication.attempt"]._trusted(
  111. "127.0.0.1",
  112. "demo",
  113. ),
  114. )
  115. # Now I know the password, but login is rejected too
  116. data1["password"] = self.good_password
  117. response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
  118. self.assertTrue(
  119. response.geturl().endswith("/web/login"),
  120. "Unexpected URL %s" % response.geturl(),
  121. )
  122. # IP has been banned, demo user cannot login
  123. with self.cursor() as cr:
  124. env = self.env(cr)
  125. self.assertFalse(
  126. env["res.authentication.attempt"]._trusted(
  127. "127.0.0.1",
  128. "demo",
  129. ),
  130. )
  131. # Attempts recorded
  132. with self.cursor() as cr:
  133. env = self.env(cr)
  134. failed = env["res.authentication.attempt"].search([
  135. ("result", "=", "failed"),
  136. ("login", "=", data1["login"]),
  137. ("remote", "=", "127.0.0.1"),
  138. ])
  139. self.assertEqual(len(failed), 3)
  140. banned = env["res.authentication.attempt"].search([
  141. ("result", "=", "banned"),
  142. ("remote", "=", "127.0.0.1"),
  143. ])
  144. self.assertEqual(len(banned), 1)
  145. # Unban
  146. banned.action_whitelist_add()
  147. # Try good login, it should work now
  148. response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
  149. self.assertTrue(response.geturl().endswith("/web"))
  150. @skip_unless_addons_installed("web")
  151. @mute_logger(*GARBAGE_LOGGERS)
  152. def test_web_login_unexisting(self, *args):
  153. """Remote is banned with fake user on web login form."""
  154. data1 = {
  155. "login": "administrator", # Wrong
  156. "password": self.good_password,
  157. }
  158. # Make sure user is logged out
  159. self.url_open("/web/session/logout", timeout=30)
  160. # Fail 3 times
  161. for n in range(3):
  162. response = self.url_open("/web/login", bytes(urlencode(data1)), 30)
  163. # If you fail, you get /web/login again
  164. self.assertTrue(
  165. response.geturl().endswith("/web/login"),
  166. "Unexpected URL %s" % response.geturl(),
  167. )
  168. # Admin banned, demo not
  169. with self.cursor() as cr:
  170. env = self.env(cr)
  171. self.assertFalse(
  172. env["res.authentication.attempt"]._trusted(
  173. "127.0.0.1",
  174. data1["login"],
  175. ),
  176. )
  177. self.assertTrue(
  178. env["res.authentication.attempt"]._trusted(
  179. "127.0.0.1",
  180. self.data_demo["login"],
  181. ),
  182. )
  183. # Demo user can login
  184. response = self.url_open(
  185. "/web/login",
  186. bytes(urlencode(self.data_demo)),
  187. 30,
  188. )
  189. # If you pass, you get /web
  190. self.assertTrue(
  191. response.geturl().endswith("/web"),
  192. "Unexpected URL %s" % response.geturl(),
  193. )
  194. self.url_open("/web/session/logout", timeout=30)
  195. # Attempts recorded
  196. with self.cursor() as cr:
  197. env = self.env(cr)
  198. failed = env["res.authentication.attempt"].search([
  199. ("result", "=", "failed"),
  200. ("login", "=", data1["login"]),
  201. ("remote", "=", "127.0.0.1"),
  202. ])
  203. self.assertEqual(len(failed), 3)
  204. banned = env["res.authentication.attempt"].search([
  205. ("result", "=", "banned"),
  206. ("login", "=", data1["login"]),
  207. ("remote", "=", "127.0.0.1"),
  208. ])
  209. self.assertEqual(len(banned), 0)
  210. @mute_logger(*GARBAGE_LOGGERS)
  211. def test_xmlrpc_login_existing(self, *args):
  212. """Remote is banned with real user on XML-RPC login."""
  213. data1 = {
  214. "login": "admin",
  215. "password": "1234", # Wrong
  216. }
  217. # Fail 3 times
  218. for n in range(3):
  219. self.assertFalse(self.xmlrpc_common.authenticate(
  220. self.env.cr.dbname, data1["login"], data1["password"], {}))
  221. # Admin banned, demo not
  222. with self.cursor() as cr:
  223. env = self.env(cr)
  224. self.assertFalse(
  225. env["res.authentication.attempt"]._trusted(
  226. "127.0.0.1",
  227. data1["login"],
  228. ),
  229. )
  230. self.assertTrue(
  231. env["res.authentication.attempt"]._trusted(
  232. "127.0.0.1",
  233. "demo",
  234. ),
  235. )
  236. # Now I know the password, but login is rejected too
  237. data1["password"] = self.good_password
  238. self.assertFalse(self.xmlrpc_common.authenticate(
  239. self.env.cr.dbname, data1["login"], data1["password"], {}))
  240. # IP has been banned, demo user cannot login
  241. with self.cursor() as cr:
  242. env = self.env(cr)
  243. self.assertFalse(
  244. env["res.authentication.attempt"]._trusted(
  245. "127.0.0.1",
  246. "demo",
  247. ),
  248. )
  249. # Attempts recorded
  250. with self.cursor() as cr:
  251. env = self.env(cr)
  252. failed = env["res.authentication.attempt"].search([
  253. ("result", "=", "failed"),
  254. ("login", "=", data1["login"]),
  255. ("remote", "=", "127.0.0.1"),
  256. ])
  257. self.assertEqual(len(failed), 3)
  258. banned = env["res.authentication.attempt"].search([
  259. ("result", "=", "banned"),
  260. ("remote", "=", "127.0.0.1"),
  261. ])
  262. self.assertEqual(len(banned), 1)
  263. # Unban
  264. banned.action_whitelist_add()
  265. # Try good login, it should work now
  266. self.assertTrue(self.xmlrpc_common.authenticate(
  267. self.env.cr.dbname, data1["login"], data1["password"], {}))
  268. @mute_logger(*GARBAGE_LOGGERS)
  269. def test_xmlrpc_login_unexisting(self, *args):
  270. """Remote is banned with fake user on XML-RPC login."""
  271. data1 = {
  272. "login": "administrator", # Wrong
  273. "password": self.good_password,
  274. }
  275. # Fail 3 times
  276. for n in range(3):
  277. self.assertFalse(self.xmlrpc_common.authenticate(
  278. self.env.cr.dbname, data1["login"], data1["password"], {}))
  279. # Admin banned, demo not
  280. with self.cursor() as cr:
  281. env = self.env(cr)
  282. self.assertFalse(
  283. env["res.authentication.attempt"]._trusted(
  284. "127.0.0.1",
  285. data1["login"],
  286. ),
  287. )
  288. self.assertTrue(
  289. env["res.authentication.attempt"]._trusted(
  290. "127.0.0.1",
  291. self.data_demo["login"],
  292. ),
  293. )
  294. # Demo user can login
  295. self.assertTrue(self.xmlrpc_common.authenticate(
  296. self.env.cr.dbname,
  297. self.data_demo["login"],
  298. self.data_demo["password"],
  299. {},
  300. ))
  301. # Attempts recorded
  302. with self.cursor() as cr:
  303. env = self.env(cr)
  304. failed = env["res.authentication.attempt"].search([
  305. ("result", "=", "failed"),
  306. ("login", "=", data1["login"]),
  307. ("remote", "=", "127.0.0.1"),
  308. ])
  309. self.assertEqual(len(failed), 3)
  310. banned = env["res.authentication.attempt"].search([
  311. ("result", "=", "banned"),
  312. ("login", "=", data1["login"]),
  313. ("remote", "=", "127.0.0.1"),
  314. ])
  315. self.assertEqual(len(banned), 0)
  316. @mute_logger(*GARBAGE_LOGGERS)
  317. def test_orm_login_existing(self, *args):
  318. """No bans on ORM login with an existing user."""
  319. data1 = {
  320. "login": "admin",
  321. "password": "1234", # Wrong
  322. }
  323. with self.cursor() as cr:
  324. env = self.env(cr)
  325. pool = RegistryManager.get(cr.dbname)
  326. # Fail 3 times
  327. for n in range(3):
  328. self.assertFalse(
  329. pool["res.users"].authenticate(
  330. cr.dbname, data1["login"], data1["password"], {}))
  331. self.assertEqual(
  332. env["res.authentication.attempt"].search(count=True, args=[]),
  333. 0,
  334. )
  335. self.assertTrue(
  336. env["res.authentication.attempt"]._trusted(
  337. "127.0.0.1",
  338. data1["login"],
  339. ),
  340. )
  341. # Now I know the password, and login works
  342. data1["password"] = self.good_password
  343. self.assertTrue(
  344. pool["res.users"].authenticate(
  345. cr.dbname, data1["login"], data1["password"], {}))
  346. @mute_logger(*GARBAGE_LOGGERS)
  347. def test_orm_login_unexisting(self, *args):
  348. """No bans on ORM login with an unexisting user."""
  349. data1 = {
  350. "login": "administrator", # Wrong
  351. "password": self.good_password,
  352. }
  353. with self.cursor() as cr:
  354. pool = RegistryManager.get(cr.dbname)
  355. env = self.env(cr)
  356. # Fail 3 times
  357. for n in range(3):
  358. self.assertFalse(
  359. pool["res.users"].authenticate(
  360. cr.dbname, data1["login"], data1["password"], {}))
  361. self.assertEqual(
  362. env["res.authentication.attempt"].search(count=True, args=[]),
  363. 0,
  364. )
  365. self.assertTrue(
  366. env["res.authentication.attempt"]._trusted(
  367. "127.0.0.1",
  368. data1["login"],
  369. ),
  370. )
  371. # Now I know the user, and login works
  372. data1["login"] = "admin"
  373. self.assertTrue(
  374. pool["res.users"].authenticate(
  375. cr.dbname, data1["login"], data1["password"], {}))