From d547b3d7d17467e14cb71a80359dd8c7b775fae0 Mon Sep 17 00:00:00 2001 From: Stefan Rijnhart Date: Thu, 7 Mar 2019 18:30:20 +0100 Subject: [PATCH] [RFR] Contextmanager --- auth_brute_force/tests/test_brute_force.py | 144 +++++++++------------ 1 file changed, 63 insertions(+), 81 deletions(-) diff --git a/auth_brute_force/tests/test_brute_force.py b/auth_brute_force/tests/test_brute_force.py index 673f2e64a..19da05482 100644 --- a/auth_brute_force/tests/test_brute_force.py +++ b/auth_brute_force/tests/test_brute_force.py @@ -46,6 +46,65 @@ def skip_unless_addons_installed(*addons): return _wrapper +def patch_cursor(func): + """ Decorator that patches the current TestCursor for nested savepoint + support """ + def acquire(cursor): + cursor._depth += 1 + cursor._lock.acquire() + cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) + + def release(cursor): + cursor.execute("RELEASE SAVEPOINT test_cursor%d" % cursor._depth) + cursor._depth -= 1 + cursor._lock.release() + + def close(cursor): + cursor.release() + + def commit(cursor): + cursor.execute("RELEASE SAVEPOINT test_cursor%d" % cursor._depth) + cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) + + def rollback(cursor): + cursor.execute( + "ROLLBACK TO SAVEPOINT test_cursor%d" % cursor._depth) + cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) + + def wrap(func, *args): + + def wrapped_function(self, *args): + with self.cursor() as cursor: + cursor.execute("SAVEPOINT test_cursor0") + cursor._depth = 1 + cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) + + cursor.__acquire = cursor.acquire + cursor.__release = cursor.release + cursor.__commit = cursor.commit + cursor.__rollback = cursor.rollback + cursor.__close = cursor.close + cursor.acquire = lambda: acquire(cursor) + cursor.release = lambda: release(cursor) + cursor.commit = lambda: commit(cursor) + cursor.rollback = lambda: rollback(cursor) + cursor.close = lambda: close(cursor) + + try: + func(self, *args) + finally: + with self.cursor() as cursor: + cursor.acquire = cursor.__acquire + cursor.release = cursor.__release + cursor.commit = cursor.__commit + cursor.rollback = cursor.__rollback + cursor.close = cursor.__close + + return wrapped_function + + return wrap + + @at_install(False) @post_install(True) # Skip CSRF validation on tests @@ -94,68 +153,10 @@ class BruteForceCase(HttpCase): ]) return set(addons) - set(found.mapped("name")) - def patch_cursor(self): - """ Monkeypatch Holger's https://github.com/odoo/odoo/pull/20033 - onto the current TestCursor so that we can fully test this module's - functionality """ - - def acquire(cursor): - cursor._depth += 1 - cursor._lock.acquire() - cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) - - def release(cursor): - cursor.execute("RELEASE SAVEPOINT test_cursor%d" % cursor._depth) - cursor._depth -= 1 - cursor._lock.release() - - def close(cursor): - # Do not rollback the cursor on close - cursor.release() - - def commit(cursor): - cursor.execute("RELEASE SAVEPOINT test_cursor%d" % cursor._depth) - cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) - - def rollback(cursor): - cursor.execute( - "ROLLBACK TO SAVEPOINT test_cursor%d" % cursor._depth) - cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) - - with self.cursor() as cursor: - cursor.execute("SAVEPOINT test_cursor0") - cursor._depth = 1 - cursor.execute("SAVEPOINT test_cursor%d" % cursor._depth) - - cursor.__acquire = cursor.acquire - cursor.__release = cursor.release - cursor.__commit = cursor.commit - cursor.__rollback = cursor.rollback - cursor.__close = cursor.close - cursor.acquire = lambda: acquire(cursor) - cursor.release = lambda: release(cursor) - cursor.commit = lambda: commit(cursor) - cursor.rollback = lambda: rollback(cursor) - cursor.close = lambda: close(cursor) - - def unpatch_cursor(self): - with self.cursor() as cursor: - cursor.acquire = cursor.__acquire - cursor.release = cursor.__release - cursor.commit = cursor.__commit - cursor.rollback = cursor.__rollback - cursor.close = cursor.__close - @skip_unless_addons_installed("web") @mute_logger(*GARBAGE_LOGGERS) + @patch_cursor def test_web_login_existing(self, *args): - self.patch_cursor() - try: - self._test_web_login_existing(*args) - finally: - self.unpatch_cursor() - - def _test_web_login_existing(self, *args): """Remote is banned with real user on web login form.""" data1 = { "login": "admin", @@ -224,14 +225,8 @@ class BruteForceCase(HttpCase): @skip_unless_addons_installed("web") @mute_logger(*GARBAGE_LOGGERS) + @patch_cursor def test_web_login_unexisting(self, *args): - self.patch_cursor() - try: - self._test_web_login_unexisting(*args) - finally: - self._unpatch_cursor() - - def _test_web_login_unexisting(self, *args): """Remote is banned with fake user on web login form.""" data1 = { "login": "administrator", # Wrong @@ -291,15 +286,8 @@ class BruteForceCase(HttpCase): self.assertEqual(len(banned), 0) @mute_logger(*GARBAGE_LOGGERS) + @patch_cursor def test_xmlrpc_login_existing(self, *args): - self.patch_cursor() - try: - self._test_xmlrpc_login_existing(*args) - finally: - self.unpatch_cursor() - - @mute_logger(*GARBAGE_LOGGERS) - def _test_xmlrpc_login_existing(self, *args): """Remote is banned with real user on XML-RPC login.""" data1 = { "login": "admin", @@ -358,14 +346,8 @@ class BruteForceCase(HttpCase): self.env.cr.dbname, data1["login"], data1["password"], {})) @mute_logger(*GARBAGE_LOGGERS) + @patch_cursor def test_xmlrpc_login_unexisting(self, *args): - self.patch_cursor() - try: - self._test_xmlrpc_login_unexisting(*args) - finally: - self.unpatch_cursor() - - def _test_xmlrpc_login_unexisting(self, *args): """Remote is banned with fake user on XML-RPC login.""" data1 = { "login": "administrator", # Wrong