You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
419 lines
17 KiB
419 lines
17 KiB
# -*- coding: utf-8 -*-
|
|
# Copyright 2016 SYLEAM
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
import hashlib
|
|
import json
|
|
import mock
|
|
import logging
|
|
from datetime import datetime
|
|
from openerp import fields
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TestOAuthProviderAurhorizeController(object):
|
|
def test_authorize_error_missing_arguments(self):
|
|
""" Call /oauth2/authorize without any argument
|
|
|
|
Must return an unknown client identifier error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Unknown Client Identifier!' in response.data)
|
|
self.assertTrue('This client identifier is invalid.' in response.data)
|
|
|
|
def test_authorize_error_invalid_request(self):
|
|
""" Call /oauth2/authorize with only the client_id argument
|
|
|
|
Must return an invalid_request error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize', data={
|
|
'client_id': self.client.identifier,
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Error: invalid_request' in response.data)
|
|
self.assertTrue('An unknown error occured! Please contact your '
|
|
'administrator' in response.data)
|
|
|
|
def test_authorize_error_unsupported_response_type(self):
|
|
""" Call /oauth2/authorize with an unsupported response type
|
|
|
|
Must return an unsupported_response_type error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize', data={
|
|
'client_id': self.client.identifier,
|
|
'response_type': 'wrong response type',
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Error: unsupported_response_type' in response.data)
|
|
self.assertTrue('An unknown error occured! Please contact your '
|
|
'administrator' in response.data)
|
|
|
|
def test_authorize_error_wrong_scopes(self):
|
|
""" Call /oauth2/authorize with wrong scopes
|
|
|
|
Must return an invalid_scope error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize', data={
|
|
'client_id': self.client.identifier,
|
|
'response_type': self.client.response_type,
|
|
'scope': 'wrong scope',
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Error: invalid_scope' in response.data)
|
|
self.assertTrue('An unknown error occured! Please contact your '
|
|
'administrator' in response.data)
|
|
|
|
def test_authorize_error_wrong_uri(self):
|
|
""" Call the authorize method with a wrong redirect_uri
|
|
|
|
Must return an invalid_request error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize', data={
|
|
'client_id': self.client.identifier,
|
|
'response_type': self.client.response_type,
|
|
'redirect_uri': 'http://wrong.redirect.uri',
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Error: invalid_request' in response.data)
|
|
self.assertTrue('Mismatching redirect URI' in response.data)
|
|
|
|
def test_authorize_error_missing_uri(self):
|
|
""" Call /oauth2/authorize without any configured redirect URI
|
|
|
|
Must return an invalid_request error
|
|
"""
|
|
self.client.redirect_uri_ids.unlink()
|
|
self.login('demo', 'demo')
|
|
response = self.get_request('/oauth2/authorize', data={
|
|
'client_id': self.client.identifier,
|
|
'response_type': self.client.response_type,
|
|
'scope': self.client.scope_ids[0].code,
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Error: invalid_request' in response.data)
|
|
self.assertTrue('Missing redirect URI.' in response.data)
|
|
|
|
def test_authorize_post_errors(self):
|
|
""" Call /oauth2/authorize in POST without any session
|
|
|
|
Must return an unknown client identifier error
|
|
"""
|
|
self.login('demo', 'demo')
|
|
response = self.post_request('/oauth2/authorize')
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue('Unknown Client Identifier!' in response.data)
|
|
self.assertTrue('This client identifier is invalid.' in response.data)
|
|
|
|
@mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock)
|
|
def test_authorize_unsafe_chars(self, request_env):
|
|
""" Call /oauth2/authorize with unsafe chars in the query string """
|
|
# Mock the http request's environ to allow it to see test records
|
|
request_env.return_value = self.env(user=self.user)
|
|
|
|
query_string = 'client_id=%s&response_type=%s&state={}' % (
|
|
self.client.identifier,
|
|
self.client.response_type,
|
|
)
|
|
self.login('demo', 'demo')
|
|
response = self.test_client.get(
|
|
'/oauth2/authorize', query_string=query_string,
|
|
environ_base=self.werkzeug_environ)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertTrue(self.client.name in response.data)
|
|
|
|
|
|
class TestOAuthProviderRefreshTokenController(object):
|
|
def test_refresh_token_error_too_much_scopes(self):
|
|
""" Call /oauth2/token using a refresh token, with too much scopes """
|
|
token = self.new_token()
|
|
response = self.post_request('/oauth2/token', data={
|
|
'client_id': self.client.identifier,
|
|
'scope': self.client.scope_ids.mapped('code'),
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': token.refresh_token,
|
|
})
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(json.loads(response.data), {'error': 'invalid_scope'})
|
|
|
|
def test_refresh_token(self):
|
|
""" Get a new token using the refresh token """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids[0]
|
|
response = self.post_request('/oauth2/token', data={
|
|
'client_id': self.client.identifier,
|
|
'scope': ' '.join(token.scope_ids.mapped('code')),
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': token.refresh_token,
|
|
})
|
|
response_data = json.loads(response.data)
|
|
# A new token should have been generated
|
|
# We can safely pick the latest generated token here, because no other
|
|
# token could have been generated during the test
|
|
new_token = self.env['oauth.provider.token'].search([
|
|
('client_id', '=', self.client.id)
|
|
], order='id DESC', limit=1)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(new_token.token, response_data['access_token'])
|
|
self.assertEqual(new_token.token_type, response_data['token_type'])
|
|
self.assertEqual(
|
|
new_token.refresh_token, response_data['refresh_token'])
|
|
self.assertEqual(new_token.scope_ids, token.scope_ids)
|
|
self.assertEqual(new_token.user_id, self.user)
|
|
|
|
|
|
class TestOAuthProviderTokeninfoController(object):
|
|
def test_tokeninfo_error_missing_arguments(self):
|
|
""" Call /oauth2/tokeninfo without any argument
|
|
|
|
Must retun an invalid_or_expired_token error
|
|
"""
|
|
response = self.get_request('/oauth2/tokeninfo')
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_or_expired_token'})
|
|
|
|
def test_tokeninfo(self):
|
|
""" Retrieve token information """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids[0]
|
|
response = self.get_request('/oauth2/tokeninfo', data={
|
|
'access_token': token.token,
|
|
})
|
|
token_lifetime = (fields.Datetime.from_string(token.expires_at) -
|
|
datetime.now()).seconds
|
|
response_data = json.loads(response.data)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(
|
|
response_data['audience'], token.client_id.identifier)
|
|
self.assertEqual(
|
|
response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
|
|
# Test a range because the test might not be accurate, depending on the
|
|
# test system load
|
|
self.assertTrue(
|
|
token_lifetime - 5 < response_data['expires_in'] <
|
|
token_lifetime + 5)
|
|
self.assertEqual(
|
|
response_data['user_id'],
|
|
hashlib.sha256(token.client_id.identifier +
|
|
token.user_id.oauth_identifier).hexdigest())
|
|
|
|
def test_tokeninfo_without_scopes(self):
|
|
""" Call /oauth2/tokeninfo without any scope
|
|
|
|
Retrieve token information without any scopes on the token
|
|
The user_id field should not be included
|
|
"""
|
|
token = self.new_token()
|
|
token.scope_ids = self.env['oauth.provider.scope']
|
|
response = self.get_request('/oauth2/tokeninfo', data={
|
|
'access_token': token.token,
|
|
})
|
|
token_lifetime = (fields.Datetime.from_string(token.expires_at) -
|
|
datetime.now()).seconds
|
|
response_data = json.loads(response.data)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(
|
|
response_data['audience'], token.client_id.identifier)
|
|
self.assertEqual(
|
|
response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
|
|
# Test a range because the test might not be accurate, depending on the
|
|
# test system load
|
|
self.assertTrue(
|
|
token_lifetime - 5 < response_data['expires_in'] <
|
|
token_lifetime + 5)
|
|
|
|
|
|
class TestOAuthProviderUserinfoController(object):
|
|
def test_userinfo_error_missing_arguments(self):
|
|
""" Call /oauth2/userinfo without any argument
|
|
|
|
Must return an invalid_or_expired_token error
|
|
"""
|
|
response = self.get_request('/oauth2/userinfo')
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_or_expired_token'})
|
|
|
|
def test_userinfo_single_scope(self):
|
|
""" Retrieve user information with only a single scope """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids[0]
|
|
|
|
# Retrieve user information
|
|
response = self.get_request('/oauth2/userinfo', data={
|
|
'access_token': token.token,
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(json.loads(response.data), {
|
|
'id': self.user.id,
|
|
'email': self.user.email,
|
|
})
|
|
|
|
def test_userinfo_multiple_scope(self):
|
|
""" Retrieve user information with only a all test scopes """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids
|
|
|
|
# Retrieve user information
|
|
response = self.get_request('/oauth2/userinfo', data={
|
|
'access_token': token.token,
|
|
})
|
|
# The Email scope allows to read the email
|
|
# The Profile scope allows to read the name and city
|
|
# The id of the recod is always added (standard Odoo behaviour)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(json.loads(response.data), {
|
|
'id': self.user.id,
|
|
'name': self.user.name,
|
|
'email': self.user.email,
|
|
'city': self.user.city,
|
|
})
|
|
|
|
|
|
class TestOAuthProviderOtherinfoController(object):
|
|
def test_otherinfo_error_missing_arguments(self):
|
|
""" Call /oauth2/otherinfo method without any argument
|
|
|
|
Must return an invalid_or_expired_token error
|
|
"""
|
|
response = self.get_request('/oauth2/otherinfo')
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_or_expired_token'})
|
|
|
|
def test_otherinfo_error_missing_model(self):
|
|
""" Call /oauth2/otherinfo method without the model argument
|
|
|
|
Must return an invalid_model error
|
|
"""
|
|
token = self.new_token()
|
|
response = self.get_request(
|
|
'/oauth2/otherinfo', data={'access_token': token.token})
|
|
self.assertEqual(response.status_code, 400)
|
|
self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
|
|
|
|
def test_otherinfo_error_invalid_model(self):
|
|
""" Call /oauth2/otherinfo method withan invalid model
|
|
|
|
Must return an invalid_model error
|
|
"""
|
|
token = self.new_token()
|
|
response = self.get_request(
|
|
'/oauth2/otherinfo',
|
|
data={'access_token': token.token, 'model': 'invalid.model'})
|
|
self.assertEqual(response.status_code, 400)
|
|
self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
|
|
|
|
def test_otherinfo_user_information(self):
|
|
""" Call /oauth2/otherinfo to retrieve information using the token """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids
|
|
|
|
# Add a new scope to test informations retrieval
|
|
token.scope_ids += self.env['oauth.provider.scope'].create({
|
|
'name': 'Groups',
|
|
'code': 'groups',
|
|
'description': 'List of accessible groups',
|
|
'model_id': self.env.ref('base.model_res_groups').id,
|
|
'filter_id': False,
|
|
'field_ids': [
|
|
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
|
|
],
|
|
})
|
|
# Retrieve user information
|
|
response = self.get_request('/oauth2/otherinfo', data={
|
|
'access_token': token.token,
|
|
'model': 'res.users',
|
|
})
|
|
# The Email scope allows to read the email
|
|
# The Profile scope allows to read the name and city
|
|
# The id of the recod is always added (standard Odoo behaviour)
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(json.loads(response.data), {str(self.user.id): {
|
|
'id': self.user.id,
|
|
'name': self.user.name,
|
|
'email': self.user.email,
|
|
'city': self.user.city,
|
|
}})
|
|
|
|
def test_otherinfo_group_information(self):
|
|
""" Call /oauth2/otherinfo to retrieve information using the token """
|
|
token = self.new_token()
|
|
token.scope_ids = self.client.scope_ids
|
|
|
|
# Add a new scope to test informations retrieval
|
|
token.scope_ids += self.env['oauth.provider.scope'].create({
|
|
'name': 'Groups',
|
|
'code': 'groups',
|
|
'description': 'List of accessible groups',
|
|
'model_id': self.env.ref('base.model_res_groups').id,
|
|
'filter_id': False,
|
|
'field_ids': [
|
|
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
|
|
],
|
|
})
|
|
|
|
# Retrieve groups information
|
|
all_groups = self.env['res.groups'].search([])
|
|
response = self.get_request('/oauth2/otherinfo', data={
|
|
'access_token': token.token,
|
|
'model': 'res.groups',
|
|
})
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(
|
|
sorted(json.loads(response.data).keys()),
|
|
sorted(map(str, all_groups.ids)))
|
|
|
|
|
|
class TestOAuthProviderRevokeTokenController(object):
|
|
def test_revoke_token_error_missing_arguments(self):
|
|
""" Call /oauth2/revoke_token method without any argument """
|
|
response = self.post_request('/oauth2/revoke_token')
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_or_expired_token'})
|
|
|
|
def test_revoke_token_error_missing_client_id(self):
|
|
""" Call /oauth2/revoke_token method without client identifier """
|
|
token = self.new_token()
|
|
response = self.post_request('/oauth2/revoke_token', data={
|
|
'token': token.token,
|
|
})
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_client'})
|
|
|
|
def test_revoke_token_error_missing_token(self):
|
|
""" Call /oauth2/revoke_token method without token """
|
|
response = self.post_request('/oauth2/revoke_token', data={
|
|
'client_id': self.client.identifier,
|
|
})
|
|
self.assertEqual(response.status_code, 401)
|
|
self.assertEqual(
|
|
json.loads(response.data), {'error': 'invalid_or_expired_token'})
|
|
|
|
def test_revoke_access_token(self):
|
|
""" Revoke an access token """
|
|
token = self.new_token()
|
|
self.post_request('/oauth2/revoke_token', data={
|
|
'client_id': self.client.identifier,
|
|
'token': token.token,
|
|
})
|
|
self.assertFalse(token.exists())
|
|
|
|
def test_revoke_refresh_token(self):
|
|
""" Revoke a refresh token """
|
|
token = self.new_token()
|
|
self.post_request('/oauth2/revoke_token', data={
|
|
'client_id': self.client.identifier,
|
|
'token': token.refresh_token,
|
|
})
|
|
self.assertTrue(token.exists())
|
|
self.assertFalse(token.refresh_token)
|