374 lines
15 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 SYLEAM
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
  4. import json
  5. import logging
  6. from .common_test_controller import OAuthProviderControllerTransactionCase
  7. from .common_test_oauth_provider_controller import \
  8. TestOAuthProviderRefreshTokenController, \
  9. TestOAuthProviderAurhorizeController, \
  10. TestOAuthProviderTokeninfoController, \
  11. TestOAuthProviderUserinfoController, \
  12. TestOAuthProviderOtherinfoController, \
  13. TestOAuthProviderRevokeTokenController
  14. _logger = logging.getLogger(__name__)
  15. try:
  16. import oauthlib
  17. except ImportError:
  18. _logger.debug('Cannot `import oauthlib`.')
  19. class TestOAuthProviderController(
  20. OAuthProviderControllerTransactionCase,
  21. TestOAuthProviderRefreshTokenController,
  22. TestOAuthProviderAurhorizeController,
  23. TestOAuthProviderTokeninfoController,
  24. TestOAuthProviderUserinfoController,
  25. TestOAuthProviderOtherinfoController,
  26. TestOAuthProviderRevokeTokenController):
  27. def setUp(self):
  28. super(TestOAuthProviderController, self).setUp('web application')
  29. def new_code(self):
  30. # Configure the client to skip the authorization page
  31. self.client.skip_authorization = True
  32. # Call the authorize method with good values
  33. state = 'Some custom state'
  34. self.login('demo', 'demo')
  35. response = self.get_request('/oauth2/authorize', data={
  36. 'client_id': self.client.identifier,
  37. 'response_type': self.client.response_type,
  38. 'redirect_uri': self.redirect_uri_base,
  39. 'scope': self.client.scope_ids[0].code,
  40. 'state': state,
  41. })
  42. # A new authorization code should have been generated
  43. # We can safely pick the latest generated code here, because no other
  44. # code could have been generated during the test
  45. code = self.env['oauth.provider.authorization.code'].search([
  46. ('client_id', '=', self.client.id),
  47. ], order='id DESC', limit=1)
  48. # The response should be a redirect to the redirect URI, with the
  49. # authorization_code added as GET parameter
  50. self.assertEqual(response.status_code, 302)
  51. query_string = oauthlib.common.urlencode(
  52. {'state': state, 'code': code.code}.items())
  53. self.assertEqual(
  54. response.headers['Location'], '{uri_base}?{query_string}'.format(
  55. uri_base=self.redirect_uri_base, query_string=query_string))
  56. self.assertEqual(code.user_id, self.user)
  57. self.logout()
  58. return code
  59. def test_token_error_missing_session(self):
  60. """ Check /oauth2/token without any session set
  61. Must return an invalid_client_id error
  62. """
  63. response = self.post_request('/oauth2/token')
  64. self.assertEqual(response.status_code, 401)
  65. self.assertEqual(
  66. json.loads(response.data), {'error': 'invalid_client_id'})
  67. def test_token_error_missing_arguments(self):
  68. """ Check /oauth2/token without any argument
  69. Must return an invalid_client_id error
  70. """
  71. # Generate an authorization code to set the session
  72. self.new_code()
  73. response = self.post_request('/oauth2/token')
  74. self.assertEqual(response.status_code, 401)
  75. self.assertEqual(
  76. json.loads(response.data), {'error': 'invalid_client_id'})
  77. def test_token_error_wrong_grant_type(self):
  78. """ Check /oauth2/token with an invalid grant type
  79. Must return an invalid_client_id error
  80. """
  81. # Generate an authorization code to set the session
  82. self.new_code()
  83. response = self.post_request('/oauth2/token', data={
  84. 'grant_type': 'Wrong grant type',
  85. })
  86. self.assertEqual(response.status_code, 401)
  87. self.assertEqual(
  88. json.loads(response.data), {'error': 'invalid_client_id'})
  89. def test_token_error_missing_code(self):
  90. """ Check /oauth2/token without code
  91. Must return an invalid_client_id error
  92. """
  93. # Generate an authorization code to set the session
  94. self.new_code()
  95. response = self.post_request('/oauth2/token', data={
  96. 'grant_type': self.client.grant_type,
  97. })
  98. self.assertEqual(response.status_code, 401)
  99. self.assertEqual(
  100. json.loads(response.data), {'error': 'invalid_client_id'})
  101. def test_token_error_missing_client_id(self):
  102. """ Check /oauth2/token without client
  103. Must return an invalid_client_id error
  104. """
  105. # Generate an authorization code to set the session
  106. self.new_code()
  107. response = self.post_request('/oauth2/token', data={
  108. 'grant_type': self.client.grant_type,
  109. 'code': 'Wrong code',
  110. })
  111. self.assertEqual(response.status_code, 401)
  112. self.assertEqual(
  113. json.loads(response.data), {'error': 'invalid_client_id'})
  114. def test_token_error_wrong_client_identifier(self):
  115. """ Check /oauth2/token with a wrong client identifier
  116. Must return an invalid_client_id error
  117. """
  118. # Generate an authorization code to set the session
  119. self.new_code()
  120. response = self.post_request('/oauth2/token', data={
  121. 'grant_type': self.client.grant_type,
  122. 'client_id': 'Wrong client identifier',
  123. 'code': 'Wrong code',
  124. })
  125. self.assertEqual(response.status_code, 401)
  126. self.assertEqual(
  127. json.loads(response.data), {'error': 'invalid_client_id'})
  128. def test_token_error_wrong_code(self):
  129. """ Check /oauth2/token with a wrong code
  130. Must return an invalid_grant error
  131. """
  132. # Generate an authorization code to set the session
  133. self.new_code()
  134. response = self.post_request('/oauth2/token', data={
  135. 'grant_type': self.client.grant_type,
  136. 'client_id': self.client.identifier,
  137. 'code': 'Wrong code',
  138. })
  139. self.assertEqual(response.status_code, 401)
  140. self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'})
  141. def test_token_error_missing_redirect_uri(self):
  142. """ Check /oauth2/token without redirect_uri
  143. Must return an access_denied error
  144. """
  145. # Generate an authorization code
  146. code = self.new_code()
  147. response = self.post_request('/oauth2/token', data={
  148. 'grant_type': self.client.grant_type,
  149. 'client_id': self.client.identifier,
  150. 'code': code.code,
  151. })
  152. # Two possible returned errors, depending on the oauthlib version
  153. self.assertIn(response.status_code, (400, 401))
  154. if response.status_code == 400:
  155. self.assertEqual(json.loads(response.data), {
  156. 'error': 'invalid_request',
  157. 'error_description': 'Mismatching redirect URI.',
  158. })
  159. else:
  160. self.assertEqual(
  161. json.loads(response.data), {'error': 'access_denied'})
  162. def test_token_error_wrong_redirect_uri(self):
  163. """ Check /oauth2/token with a wrong redirect_uri
  164. Must return an access_denied error
  165. """
  166. # Generate an authorization code
  167. code = self.new_code()
  168. response = self.post_request('/oauth2/token', data={
  169. 'grant_type': self.client.grant_type,
  170. 'client_id': self.client.identifier,
  171. 'code': code.code,
  172. 'redirect_uri': 'Wrong redirect URI',
  173. })
  174. # Two possible returned errors, depending on the oauthlib version
  175. self.assertIn(response.status_code, (400, 401))
  176. if response.status_code == 400:
  177. self.assertEqual(json.loads(response.data), {
  178. 'error': 'invalid_request',
  179. 'error_description': 'Mismatching redirect URI.',
  180. })
  181. else:
  182. self.assertEqual(
  183. json.loads(response.data), {'error': 'access_denied'})
  184. def test_token_error_wrong_client_id(self):
  185. """ Check /oauth2/token with a wrong client id
  186. Must return an invalid_client_id error
  187. """
  188. # Generate an authorization code
  189. code = self.new_code()
  190. response = self.post_request('/oauth2/token', data={
  191. 'grant_type': self.client.grant_type,
  192. 'client_id': 'Wrong client id',
  193. 'code': code.code,
  194. 'redirect_uri': self.redirect_uri_base,
  195. 'scope': self.client.scope_ids[0].code,
  196. })
  197. self.assertEqual(response.status_code, 401)
  198. self.assertEqual(
  199. json.loads(response.data), {'error': 'invalid_client_id'})
  200. def test_token_error_missing_refresh_token(self):
  201. """ Check /oauth2/token in refresh token mode without refresh token
  202. Must return an invalid_request error
  203. """
  204. # Generate an authorization code to set the session
  205. self.new_code()
  206. response = self.post_request('/oauth2/token', data={
  207. 'grant_type': 'refresh_token',
  208. 'client_id': self.client.identifier,
  209. 'redirect_uri': self.redirect_uri_base,
  210. 'scope': self.client.scope_ids[0].code,
  211. })
  212. self.assertEqual(response.status_code, 400)
  213. self.assertEqual(json.loads(response.data), {
  214. 'error': 'invalid_request',
  215. 'error_description': 'Missing refresh token parameter.',
  216. })
  217. def test_token_error_invalid_refresh_token(self):
  218. """ Check /oauth2/token in refresh token mode with an invalid refresh token
  219. Must return an invalid_grant error
  220. """
  221. # Generate an authorization code to set the session
  222. self.new_code()
  223. response = self.post_request('/oauth2/token', data={
  224. 'grant_type': 'refresh_token',
  225. 'client_id': self.client.identifier,
  226. 'redirect_uri': self.redirect_uri_base,
  227. 'scope': self.client.scope_ids[0].code,
  228. 'refresh_token': 'Wrong refresh token',
  229. })
  230. self.assertEqual(response.status_code, 401)
  231. self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'})
  232. def test_authorize_skip_authorization(self):
  233. """ Call /oauth2/authorize while skipping the authorization page """
  234. # Configure the client to skip the authorization page
  235. self.client.skip_authorization = True
  236. # Call the authorize method with good values
  237. state = 'Some custom state'
  238. self.login('demo', 'demo')
  239. response = self.get_request('/oauth2/authorize', data={
  240. 'client_id': self.client.identifier,
  241. 'response_type': self.client.response_type,
  242. 'redirect_uri': self.redirect_uri_base,
  243. 'scope': self.client.scope_ids[0].code,
  244. 'state': state,
  245. })
  246. # A new authorization code should have been generated
  247. # We can safely pick the latest generated code here, because no other
  248. # code could have been generated during the test
  249. code = self.env['oauth.provider.authorization.code'].search([
  250. ('client_id', '=', self.client.id),
  251. ], order='id DESC', limit=1)
  252. # The response should be a redirect to the redirect URI, with the
  253. # authorization_code added as GET parameter
  254. self.assertEqual(response.status_code, 302)
  255. query_string = oauthlib.common.urlencode({
  256. 'state': state,
  257. 'code': code.code,
  258. }.items())
  259. self.assertEqual(
  260. response.headers['Location'], '{uri_base}?{query_string}'.format(
  261. uri_base=self.redirect_uri_base, query_string=query_string))
  262. self.assertEqual(code.user_id, self.user)
  263. def test_successful_token_retrieval(self):
  264. """ Check the full process for a WebApplication
  265. GET, then POST, token and informations retrieval
  266. """
  267. # Call the authorize method with good values to fill the session scopes
  268. # and credentials variables
  269. state = 'Some custom state'
  270. self.login('demo', 'demo')
  271. response = self.get_request('/oauth2/authorize', data={
  272. 'client_id': self.client.identifier,
  273. 'response_type': self.client.response_type,
  274. 'redirect_uri': self.redirect_uri_base,
  275. 'scope': self.client.scope_ids[0].code,
  276. 'state': state,
  277. })
  278. self.assertEqual(response.status_code, 200)
  279. self.assertTrue(self.client.name in response.data)
  280. self.assertTrue(self.client.scope_ids[0].name in response.data)
  281. self.assertTrue(self.client.scope_ids[0].description in response.data)
  282. # Then, call the POST route to validate the authorization
  283. response = self.post_request('/oauth2/authorize')
  284. # A new authorization code should have been generated
  285. # We can safely pick the latest generated code here, because no other
  286. # code could have been generated during the test
  287. code = self.env['oauth.provider.authorization.code'].search([
  288. ('client_id', '=', self.client.id),
  289. ], order='id DESC', limit=1)
  290. # The response should be a redirect to the redirect URI, with the
  291. # authorization_code added as GET parameter
  292. self.assertEqual(response.status_code, 302)
  293. query_string = oauthlib.common.urlencode({
  294. 'state': state,
  295. 'code': code.code,
  296. }.items())
  297. self.assertEqual(
  298. response.headers['Location'], '{uri_base}?{query_string}'.format(
  299. uri_base=self.redirect_uri_base, query_string=query_string))
  300. self.assertEqual(code.user_id, self.user)
  301. self.logout()
  302. # Now that the user vaidated the authorization, we can ask for a token,
  303. # using the returned code
  304. response = self.post_request('/oauth2/token', data={
  305. 'client_id': self.client.identifier,
  306. 'redirect_uri': self.redirect_uri_base,
  307. 'scope': self.client.scope_ids[0].code,
  308. 'code': code.code,
  309. 'grant_type': self.client.grant_type,
  310. })
  311. response_data = json.loads(response.data)
  312. # A new token should have been generated
  313. # We can safely pick the latest generated token here, because no other
  314. # token could have been generated during the test
  315. token = self.env['oauth.provider.token'].search([
  316. ('client_id', '=', self.client.id),
  317. ], order='id DESC', limit=1)
  318. self.assertEqual(response.status_code, 200)
  319. self.assertEqual(token.token, response_data['access_token'])
  320. self.assertEqual(token.token_type, response_data['token_type'])
  321. self.assertEqual(token.refresh_token, response_data['refresh_token'])
  322. self.assertEqual(token.scope_ids, code.scope_ids)
  323. self.assertEqual(token.user_id, self.user)