You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
7.5 KiB

  1. ###################################################################################
  2. #
  3. # Copyright (C) 2017 MuK IT GmbH
  4. #
  5. # Odoo Proprietary License v1.0
  6. #
  7. # This software and associated files (the "Software") may only be used
  8. # (executed, modified, executed after modifications) if you have
  9. # purchased a valid license from the authors, typically via Odoo Apps,
  10. # or if you have received a written agreement from the authors of the
  11. # Software (see the COPYRIGHT file).
  12. #
  13. # You may develop Odoo modules that use the Software as a library
  14. # (typically by depending on it, importing it and using its resources),
  15. # but without copying any source code or material from the Software.
  16. # You may distribute those modules under the license of your choice,
  17. # provided that this license is compatible with the terms of the Odoo
  18. # Proprietary License (For example: LGPL, MIT, or proprietary licenses
  19. # similar to this one).
  20. #
  21. # It is forbidden to publish, distribute, sublicense, or sell copies of
  22. # the Software or modified copies of the Software.
  23. #
  24. # The above copyright notice and this permission notice must be included
  25. # in all copies or substantial portions of the Software.
  26. #
  27. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  28. # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  29. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  30. # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  31. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  32. # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  33. # DEALINGS IN THE SOFTWARE.
  34. #
  35. ###################################################################################
  36. import os
  37. import time
  38. import hmac
  39. import hashlib
  40. import logging
  41. import functools
  42. import threading
  43. import traceback
  44. from odoo.tests import common, HOST, PORT
  45. _path = os.path.dirname(os.path.dirname(__file__))
  46. _logger = logging.getLogger(__name__)
  47. #----------------------------------------------------------
  48. # Decorators
  49. #----------------------------------------------------------
  50. def multi_users(users=[['base.user_root', True], ['base.user_admin', True]], reset=True, raise_exception=True):
  51. def decorator(func):
  52. @functools.wraps(func)
  53. def wrapper(self, *args, **kwargs):
  54. user_list = users(self) if callable(users) else users
  55. test_results = []
  56. for user in user_list:
  57. self.cr.execute('SAVEPOINT test_multi_users')
  58. try:
  59. if not isinstance(user[0], int):
  60. self.uid = self.ref(user[0])
  61. else:
  62. self.uid = user[0]
  63. func(self, *args, **kwargs)
  64. except Exception as error:
  65. test_results.append({
  66. 'user': user[0],
  67. 'expect': user[1],
  68. 'result': False,
  69. 'error': error,
  70. })
  71. else:
  72. test_results.append({
  73. 'user': user[0],
  74. 'expect': user[1],
  75. 'result': True,
  76. 'error': None,
  77. })
  78. if reset:
  79. self.env.cache.invalidate()
  80. self.registry.clear_caches()
  81. self.registry.reset_changes()
  82. self.cr.execute('ROLLBACK TO SAVEPOINT test_multi_users')
  83. else:
  84. self._cr.execute('RELEASE SAVEPOINT test_multi_users')
  85. test_fails = []
  86. for result in test_results:
  87. if result['expect'] != result['result']:
  88. message = "Test (%s) with user (%s) failed!"
  89. _logger.info(message % (func.__name__, result['user']))
  90. if result['error']:
  91. _logger.error(result['error'], exc_info=True)
  92. test_fails.append(result)
  93. if test_fails:
  94. message = "%s out of %s tests failed" % (len(test_fails), len(test_results))
  95. if raise_exception:
  96. raise test_fails[0]['error']
  97. else:
  98. _logger.info(message)
  99. return test_results
  100. return wrapper
  101. return decorator
  102. def track_function(max_query_count=None, max_query_time=None, max_time=None, return_tracking=False):
  103. def decorator(func):
  104. @functools.wraps(func)
  105. def wrapper(*args, **kwargs):
  106. tracking_parameters = [func.__name__]
  107. threading.current_thread().query_time = 0
  108. threading.current_thread().query_count = 0
  109. threading.current_thread().perf_t0 = time.time()
  110. result = func(*args, **kwargs)
  111. message = "%s" % func.__name__
  112. if args and hasattr(args[0], "uid"):
  113. message = " (%s)" % args[0].uid
  114. if hasattr(threading.current_thread(), "query_count"):
  115. query_count = threading.current_thread().query_count
  116. query_time = threading.current_thread().query_time
  117. perf_t0 = threading.current_thread().perf_t0
  118. remaining_time = time.time() - perf_t0 - query_time
  119. time_taken = query_time + remaining_time
  120. message += " - %s Q %.3fs QT %.3fs OT %.3fs TT" % (
  121. query_count, query_time, remaining_time, time_taken
  122. )
  123. tracking_parameters += [
  124. query_count, query_time, remaining_time, time_taken
  125. ]
  126. if max_query_count and query_count > max_query_count:
  127. raise AssertionError("More than %s queries" % max_query_count)
  128. if max_query_time and query_time > max_query_time:
  129. raise AssertionError("Queries took longer than %.3fs" % max_query_time)
  130. if max_time and time_taken > max_time:
  131. raise AssertionError("Function took longer than %.3fs" % max_time)
  132. _logger.info(message)
  133. if return_tracking:
  134. return result, tracking_parameters
  135. return result
  136. return wrapper
  137. return decorator
  138. #----------------------------------------------------------
  139. # Test Cases
  140. #----------------------------------------------------------
  141. class HttpCase(common.HttpCase):
  142. def csrf_token(self, time_limit=3600):
  143. token = self.session.sid
  144. max_ts = '' if not time_limit else int(time.time() + time_limit)
  145. msg = '%s%s' % (token, max_ts)
  146. secret = self.env['ir.config_parameter'].sudo().get_param('database.secret')
  147. assert secret, "CSRF protection requires a configured database secret"
  148. hm = hmac.new(secret.encode('ascii'), msg.encode('utf-8'), hashlib.sha1).hexdigest()
  149. return '%so%s' % (hm, max_ts)
  150. def url_open(self, url, data=None, timeout=10, csrf=False):
  151. if url.startswith('/'):
  152. url = "http://%s:%s%s" % (HOST, PORT, url)
  153. if data:
  154. if csrf:
  155. data.update({'csrf_token': self.csrf_token()})
  156. return self.opener.post(url, data=data, timeout=timeout)
  157. return self.opener.get(url, timeout=timeout)