You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

155 lines
6.5 KiB

  1. ###################################################################################
  2. #
  3. # Copyright (c) 2017-2019 MuK IT GmbH.
  4. #
  5. # This file is part of MuK Utils
  6. # (see https://mukit.at).
  7. #
  8. # This program is free software: you can redistribute it and/or modify
  9. # it under the terms of the GNU Lesser General Public License as published by
  10. # the Free Software Foundation, either version 3 of the License, or
  11. # (at your option) any later version.
  12. #
  13. # This program is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. # GNU Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public License
  19. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  20. #
  21. ###################################################################################
  22. import os
  23. import time
  24. import hmac
  25. import hashlib
  26. import logging
  27. import functools
  28. import threading
  29. import traceback
  30. from odoo.tests import common, HOST, PORT
  31. _path = os.path.dirname(os.path.dirname(__file__))
  32. _logger = logging.getLogger(__name__)
  33. #----------------------------------------------------------
  34. # Decorators
  35. #----------------------------------------------------------
  36. def multi_users(users=[['base.user_root', True], ['base.user_admin', True]], reset=True, raise_exception=True):
  37. def decorator(func):
  38. @functools.wraps(func)
  39. def wrapper(self, *args, **kwargs):
  40. user_list = users(self) if callable(users) else users
  41. test_results = []
  42. for user in user_list:
  43. self.cr.execute('SAVEPOINT test_multi_users')
  44. try:
  45. if not isinstance(user[0], int):
  46. self.uid = self.ref(user[0])
  47. else:
  48. self.uid = user[0]
  49. func(self, *args, **kwargs)
  50. except Exception as error:
  51. test_results.append({
  52. 'user': user[0],
  53. 'expect': user[1],
  54. 'result': False,
  55. 'error': error,
  56. })
  57. else:
  58. test_results.append({
  59. 'user': user[0],
  60. 'expect': user[1],
  61. 'result': True,
  62. 'error': None,
  63. })
  64. if reset:
  65. self.env.cache.invalidate()
  66. self.registry.clear_caches()
  67. self.registry.reset_changes()
  68. self.cr.execute('ROLLBACK TO SAVEPOINT test_multi_users')
  69. else:
  70. self._cr.execute('RELEASE SAVEPOINT test_multi_users')
  71. test_fails = []
  72. for result in test_results:
  73. if result['expect'] != result['result']:
  74. message = "Test (%s) with user (%s) failed!"
  75. _logger.info(message % (func.__name__, result['user']))
  76. if result['error']:
  77. _logger.error(result['error'], exc_info=True)
  78. test_fails.append(result)
  79. if test_fails:
  80. message = "%s out of %s tests failed" % (len(test_fails), len(test_results))
  81. if raise_exception:
  82. raise test_fails[0]['error']
  83. else:
  84. _logger.info(message)
  85. return test_results
  86. return wrapper
  87. return decorator
  88. def track_function(max_query_count=None, max_query_time=None, max_time=None, return_tracking=False):
  89. def decorator(func):
  90. @functools.wraps(func)
  91. def wrapper(*args, **kwargs):
  92. tracking_parameters = [func.__name__]
  93. threading.current_thread().query_time = 0
  94. threading.current_thread().query_count = 0
  95. threading.current_thread().perf_t0 = time.time()
  96. result = func(*args, **kwargs)
  97. message = "%s" % func.__name__
  98. if args and hasattr(args[0], "uid"):
  99. message = " (%s)" % args[0].uid
  100. if hasattr(threading.current_thread(), "query_count"):
  101. query_count = threading.current_thread().query_count
  102. query_time = threading.current_thread().query_time
  103. perf_t0 = threading.current_thread().perf_t0
  104. remaining_time = time.time() - perf_t0 - query_time
  105. time_taken = query_time + remaining_time
  106. message += " - %s Q %.3fs QT %.3fs OT %.3fs TT" % (
  107. query_count, query_time, remaining_time, time_taken
  108. )
  109. tracking_parameters += [
  110. query_count, query_time, remaining_time, time_taken
  111. ]
  112. if max_query_count and query_count > max_query_count:
  113. raise AssertionError("More than %s queries" % max_query_count)
  114. if max_query_time and query_time > max_query_time:
  115. raise AssertionError("Queries took longer than %.3fs" % max_query_time)
  116. if max_time and time_taken > max_time:
  117. raise AssertionError("Function took longer than %.3fs" % max_time)
  118. if not return_tracking:
  119. _logger.info(message)
  120. if return_tracking:
  121. return result, tracking_parameters
  122. return result
  123. return wrapper
  124. return decorator
  125. #----------------------------------------------------------
  126. # Test Cases
  127. #----------------------------------------------------------
  128. class HttpCase(common.HttpCase):
  129. def csrf_token(self, time_limit=3600):
  130. token = self.session.sid
  131. max_ts = '' if not time_limit else int(time.time() + time_limit)
  132. msg = '%s%s' % (token, max_ts)
  133. secret = self.env['ir.config_parameter'].sudo().get_param('database.secret')
  134. assert secret, "CSRF protection requires a configured database secret"
  135. hm = hmac.new(secret.encode('ascii'), msg.encode('utf-8'), hashlib.sha1).hexdigest()
  136. return '%so%s' % (hm, max_ts)
  137. def url_open(self, url, data=None, timeout=10, csrf=False):
  138. if url.startswith('/'):
  139. url = "http://%s:%s%s" % (HOST, PORT, url)
  140. if data:
  141. if csrf:
  142. data.update({'csrf_token': self.csrf_token()})
  143. return self.opener.post(url, data=data, timeout=timeout)
  144. return self.opener.get(url, timeout=timeout)