You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

419 lines
17 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 SYLEAM
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
  4. import hashlib
  5. import json
  6. import mock
  7. import logging
  8. from datetime import datetime
  9. from openerp import fields
  10. _logger = logging.getLogger(__name__)
  11. class TestOAuthProviderAurhorizeController(object):
  12. def test_authorize_error_missing_arguments(self):
  13. """ Call /oauth2/authorize without any argument
  14. Must return an unknown client identifier error
  15. """
  16. self.login('demo', 'demo')
  17. response = self.get_request('/oauth2/authorize')
  18. self.assertEqual(response.status_code, 200)
  19. self.assertTrue('Unknown Client Identifier!' in response.data)
  20. self.assertTrue('This client identifier is invalid.' in response.data)
  21. def test_authorize_error_invalid_request(self):
  22. """ Call /oauth2/authorize with only the client_id argument
  23. Must return an invalid_request error
  24. """
  25. self.login('demo', 'demo')
  26. response = self.get_request('/oauth2/authorize', data={
  27. 'client_id': self.client.identifier,
  28. })
  29. self.assertEqual(response.status_code, 200)
  30. self.assertTrue('Error: invalid_request' in response.data)
  31. self.assertTrue('An unknown error occured! Please contact your '
  32. 'administrator' in response.data)
  33. def test_authorize_error_unsupported_response_type(self):
  34. """ Call /oauth2/authorize with an unsupported response type
  35. Must return an unsupported_response_type error
  36. """
  37. self.login('demo', 'demo')
  38. response = self.get_request('/oauth2/authorize', data={
  39. 'client_id': self.client.identifier,
  40. 'response_type': 'wrong response type',
  41. })
  42. self.assertEqual(response.status_code, 200)
  43. self.assertTrue('Error: unsupported_response_type' in response.data)
  44. self.assertTrue('An unknown error occured! Please contact your '
  45. 'administrator' in response.data)
  46. def test_authorize_error_wrong_scopes(self):
  47. """ Call /oauth2/authorize with wrong scopes
  48. Must return an invalid_scope error
  49. """
  50. self.login('demo', 'demo')
  51. response = self.get_request('/oauth2/authorize', data={
  52. 'client_id': self.client.identifier,
  53. 'response_type': self.client.response_type,
  54. 'scope': 'wrong scope',
  55. })
  56. self.assertEqual(response.status_code, 200)
  57. self.assertTrue('Error: invalid_scope' in response.data)
  58. self.assertTrue('An unknown error occured! Please contact your '
  59. 'administrator' in response.data)
  60. def test_authorize_error_wrong_uri(self):
  61. """ Call the authorize method with a wrong redirect_uri
  62. Must return an invalid_request error
  63. """
  64. self.login('demo', 'demo')
  65. response = self.get_request('/oauth2/authorize', data={
  66. 'client_id': self.client.identifier,
  67. 'response_type': self.client.response_type,
  68. 'redirect_uri': 'http://wrong.redirect.uri',
  69. })
  70. self.assertEqual(response.status_code, 200)
  71. self.assertTrue('Error: invalid_request' in response.data)
  72. self.assertTrue('Mismatching redirect URI' in response.data)
  73. def test_authorize_error_missing_uri(self):
  74. """ Call /oauth2/authorize without any configured redirect URI
  75. Must return an invalid_request error
  76. """
  77. self.client.redirect_uri_ids.unlink()
  78. self.login('demo', 'demo')
  79. response = self.get_request('/oauth2/authorize', data={
  80. 'client_id': self.client.identifier,
  81. 'response_type': self.client.response_type,
  82. 'scope': self.client.scope_ids[0].code,
  83. })
  84. self.assertEqual(response.status_code, 200)
  85. self.assertTrue('Error: invalid_request' in response.data)
  86. self.assertTrue('Missing redirect URI.' in response.data)
  87. def test_authorize_post_errors(self):
  88. """ Call /oauth2/authorize in POST without any session
  89. Must return an unknown client identifier error
  90. """
  91. self.login('demo', 'demo')
  92. response = self.post_request('/oauth2/authorize')
  93. self.assertEqual(response.status_code, 200)
  94. self.assertTrue('Unknown Client Identifier!' in response.data)
  95. self.assertTrue('This client identifier is invalid.' in response.data)
  96. @mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock)
  97. def test_authorize_unsafe_chars(self, request_env):
  98. """ Call /oauth2/authorize with unsafe chars in the query string """
  99. # Mock the http request's environ to allow it to see test records
  100. request_env.return_value = self.env(user=self.user)
  101. query_string = 'client_id=%s&response_type=%s&state={}' % (
  102. self.client.identifier,
  103. self.client.response_type,
  104. )
  105. self.login('demo', 'demo')
  106. response = self.test_client.get(
  107. '/oauth2/authorize', query_string=query_string,
  108. environ_base=self.werkzeug_environ)
  109. self.assertEqual(response.status_code, 200)
  110. self.assertTrue(self.client.name in response.data)
  111. class TestOAuthProviderRefreshTokenController(object):
  112. def test_refresh_token_error_too_much_scopes(self):
  113. """ Call /oauth2/token using a refresh token, with too much scopes """
  114. token = self.new_token()
  115. response = self.post_request('/oauth2/token', data={
  116. 'client_id': self.client.identifier,
  117. 'scope': self.client.scope_ids.mapped('code'),
  118. 'grant_type': 'refresh_token',
  119. 'refresh_token': token.refresh_token,
  120. })
  121. self.assertEqual(response.status_code, 401)
  122. self.assertEqual(json.loads(response.data), {'error': 'invalid_scope'})
  123. def test_refresh_token(self):
  124. """ Get a new token using the refresh token """
  125. token = self.new_token()
  126. token.scope_ids = self.client.scope_ids[0]
  127. response = self.post_request('/oauth2/token', data={
  128. 'client_id': self.client.identifier,
  129. 'scope': ' '.join(token.scope_ids.mapped('code')),
  130. 'grant_type': 'refresh_token',
  131. 'refresh_token': token.refresh_token,
  132. })
  133. response_data = json.loads(response.data)
  134. # A new token should have been generated
  135. # We can safely pick the latest generated token here, because no other
  136. # token could have been generated during the test
  137. new_token = self.env['oauth.provider.token'].search([
  138. ('client_id', '=', self.client.id)
  139. ], order='id DESC', limit=1)
  140. self.assertEqual(response.status_code, 200)
  141. self.assertEqual(new_token.token, response_data['access_token'])
  142. self.assertEqual(new_token.token_type, response_data['token_type'])
  143. self.assertEqual(
  144. new_token.refresh_token, response_data['refresh_token'])
  145. self.assertEqual(new_token.scope_ids, token.scope_ids)
  146. self.assertEqual(new_token.user_id, self.user)
  147. class TestOAuthProviderTokeninfoController(object):
  148. def test_tokeninfo_error_missing_arguments(self):
  149. """ Call /oauth2/tokeninfo without any argument
  150. Must retun an invalid_or_expired_token error
  151. """
  152. response = self.get_request('/oauth2/tokeninfo')
  153. self.assertEqual(response.status_code, 401)
  154. self.assertEqual(
  155. json.loads(response.data), {'error': 'invalid_or_expired_token'})
  156. def test_tokeninfo(self):
  157. """ Retrieve token information """
  158. token = self.new_token()
  159. token.scope_ids = self.client.scope_ids[0]
  160. response = self.get_request('/oauth2/tokeninfo', data={
  161. 'access_token': token.token,
  162. })
  163. token_lifetime = (fields.Datetime.from_string(token.expires_at) -
  164. datetime.now()).seconds
  165. response_data = json.loads(response.data)
  166. self.assertEqual(response.status_code, 200)
  167. self.assertEqual(
  168. response_data['audience'], token.client_id.identifier)
  169. self.assertEqual(
  170. response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
  171. # Test a range because the test might not be accurate, depending on the
  172. # test system load
  173. self.assertTrue(
  174. token_lifetime - 5 < response_data['expires_in'] <
  175. token_lifetime + 5)
  176. self.assertEqual(
  177. response_data['user_id'],
  178. hashlib.sha256(token.client_id.identifier +
  179. token.user_id.oauth_identifier).hexdigest())
  180. def test_tokeninfo_without_scopes(self):
  181. """ Call /oauth2/tokeninfo without any scope
  182. Retrieve token information without any scopes on the token
  183. The user_id field should not be included
  184. """
  185. token = self.new_token()
  186. token.scope_ids = self.env['oauth.provider.scope']
  187. response = self.get_request('/oauth2/tokeninfo', data={
  188. 'access_token': token.token,
  189. })
  190. token_lifetime = (fields.Datetime.from_string(token.expires_at) -
  191. datetime.now()).seconds
  192. response_data = json.loads(response.data)
  193. self.assertEqual(response.status_code, 200)
  194. self.assertEqual(
  195. response_data['audience'], token.client_id.identifier)
  196. self.assertEqual(
  197. response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
  198. # Test a range because the test might not be accurate, depending on the
  199. # test system load
  200. self.assertTrue(
  201. token_lifetime - 5 < response_data['expires_in'] <
  202. token_lifetime + 5)
  203. class TestOAuthProviderUserinfoController(object):
  204. def test_userinfo_error_missing_arguments(self):
  205. """ Call /oauth2/userinfo without any argument
  206. Must return an invalid_or_expired_token error
  207. """
  208. response = self.get_request('/oauth2/userinfo')
  209. self.assertEqual(response.status_code, 401)
  210. self.assertEqual(
  211. json.loads(response.data), {'error': 'invalid_or_expired_token'})
  212. def test_userinfo_single_scope(self):
  213. """ Retrieve user information with only a single scope """
  214. token = self.new_token()
  215. token.scope_ids = self.client.scope_ids[0]
  216. # Retrieve user information
  217. response = self.get_request('/oauth2/userinfo', data={
  218. 'access_token': token.token,
  219. })
  220. self.assertEqual(response.status_code, 200)
  221. self.assertEqual(json.loads(response.data), {
  222. 'id': self.user.id,
  223. 'email': self.user.email,
  224. })
  225. def test_userinfo_multiple_scope(self):
  226. """ Retrieve user information with only a all test scopes """
  227. token = self.new_token()
  228. token.scope_ids = self.client.scope_ids
  229. # Retrieve user information
  230. response = self.get_request('/oauth2/userinfo', data={
  231. 'access_token': token.token,
  232. })
  233. # The Email scope allows to read the email
  234. # The Profile scope allows to read the name and city
  235. # The id of the recod is always added (standard Odoo behaviour)
  236. self.assertEqual(response.status_code, 200)
  237. self.assertEqual(json.loads(response.data), {
  238. 'id': self.user.id,
  239. 'name': self.user.name,
  240. 'email': self.user.email,
  241. 'city': self.user.city,
  242. })
  243. class TestOAuthProviderOtherinfoController(object):
  244. def test_otherinfo_error_missing_arguments(self):
  245. """ Call /oauth2/otherinfo method without any argument
  246. Must return an invalid_or_expired_token error
  247. """
  248. response = self.get_request('/oauth2/otherinfo')
  249. self.assertEqual(response.status_code, 401)
  250. self.assertEqual(
  251. json.loads(response.data), {'error': 'invalid_or_expired_token'})
  252. def test_otherinfo_error_missing_model(self):
  253. """ Call /oauth2/otherinfo method without the model argument
  254. Must return an invalid_model error
  255. """
  256. token = self.new_token()
  257. response = self.get_request(
  258. '/oauth2/otherinfo', data={'access_token': token.token})
  259. self.assertEqual(response.status_code, 400)
  260. self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
  261. def test_otherinfo_error_invalid_model(self):
  262. """ Call /oauth2/otherinfo method withan invalid model
  263. Must return an invalid_model error
  264. """
  265. token = self.new_token()
  266. response = self.get_request(
  267. '/oauth2/otherinfo',
  268. data={'access_token': token.token, 'model': 'invalid.model'})
  269. self.assertEqual(response.status_code, 400)
  270. self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
  271. def test_otherinfo_user_information(self):
  272. """ Call /oauth2/otherinfo to retrieve information using the token """
  273. token = self.new_token()
  274. token.scope_ids = self.client.scope_ids
  275. # Add a new scope to test informations retrieval
  276. token.scope_ids += self.env['oauth.provider.scope'].create({
  277. 'name': 'Groups',
  278. 'code': 'groups',
  279. 'description': 'List of accessible groups',
  280. 'model_id': self.env.ref('base.model_res_groups').id,
  281. 'filter_id': False,
  282. 'field_ids': [
  283. (6, 0, [self.env.ref('base.field_res_groups_name').id]),
  284. ],
  285. })
  286. # Retrieve user information
  287. response = self.get_request('/oauth2/otherinfo', data={
  288. 'access_token': token.token,
  289. 'model': 'res.users',
  290. })
  291. # The Email scope allows to read the email
  292. # The Profile scope allows to read the name and city
  293. # The id of the recod is always added (standard Odoo behaviour)
  294. self.assertEqual(response.status_code, 200)
  295. self.assertEqual(json.loads(response.data), {str(self.user.id): {
  296. 'id': self.user.id,
  297. 'name': self.user.name,
  298. 'email': self.user.email,
  299. 'city': self.user.city,
  300. }})
  301. def test_otherinfo_group_information(self):
  302. """ Call /oauth2/otherinfo to retrieve information using the token """
  303. token = self.new_token()
  304. token.scope_ids = self.client.scope_ids
  305. # Add a new scope to test informations retrieval
  306. token.scope_ids += self.env['oauth.provider.scope'].create({
  307. 'name': 'Groups',
  308. 'code': 'groups',
  309. 'description': 'List of accessible groups',
  310. 'model_id': self.env.ref('base.model_res_groups').id,
  311. 'filter_id': False,
  312. 'field_ids': [
  313. (6, 0, [self.env.ref('base.field_res_groups_name').id]),
  314. ],
  315. })
  316. # Retrieve groups information
  317. all_groups = self.env['res.groups'].search([])
  318. response = self.get_request('/oauth2/otherinfo', data={
  319. 'access_token': token.token,
  320. 'model': 'res.groups',
  321. })
  322. self.assertEqual(response.status_code, 200)
  323. self.assertEqual(
  324. sorted(json.loads(response.data).keys()),
  325. sorted(map(str, all_groups.ids)))
  326. class TestOAuthProviderRevokeTokenController(object):
  327. def test_revoke_token_error_missing_arguments(self):
  328. """ Call /oauth2/revoke_token method without any argument """
  329. response = self.post_request('/oauth2/revoke_token')
  330. self.assertEqual(response.status_code, 401)
  331. self.assertEqual(
  332. json.loads(response.data), {'error': 'invalid_or_expired_token'})
  333. def test_revoke_token_error_missing_client_id(self):
  334. """ Call /oauth2/revoke_token method without client identifier """
  335. token = self.new_token()
  336. response = self.post_request('/oauth2/revoke_token', data={
  337. 'token': token.token,
  338. })
  339. self.assertEqual(response.status_code, 401)
  340. self.assertEqual(
  341. json.loads(response.data), {'error': 'invalid_client'})
  342. def test_revoke_token_error_missing_token(self):
  343. """ Call /oauth2/revoke_token method without token """
  344. response = self.post_request('/oauth2/revoke_token', data={
  345. 'client_id': self.client.identifier,
  346. })
  347. self.assertEqual(response.status_code, 401)
  348. self.assertEqual(
  349. json.loads(response.data), {'error': 'invalid_or_expired_token'})
  350. def test_revoke_access_token(self):
  351. """ Revoke an access token """
  352. token = self.new_token()
  353. self.post_request('/oauth2/revoke_token', data={
  354. 'client_id': self.client.identifier,
  355. 'token': token.token,
  356. })
  357. self.assertFalse(token.exists())
  358. def test_revoke_refresh_token(self):
  359. """ Revoke a refresh token """
  360. token = self.new_token()
  361. self.post_request('/oauth2/revoke_token', data={
  362. 'client_id': self.client.identifier,
  363. 'token': token.refresh_token,
  364. })
  365. self.assertTrue(token.exists())
  366. self.assertFalse(token.refresh_token)