diff --git a/oauth_provider/README.rst b/oauth_provider/README.rst new file mode 100644 index 000000000..1eae92949 --- /dev/null +++ b/oauth_provider/README.rst @@ -0,0 +1,142 @@ +.. image:: https://img.shields.io/badge/licence-AGPL--3-blue.svg + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 + +============== +OAuth Provider +============== + +This module allows you to turn Odoo into an OAuth 2 provider. + +It's meant to provide the basic authentication feature, and some data access routes. +But you are encouraged to create custom routes, in other modules, to give structured data for any specific need. + +Installation +============ + +To install this module, you need to: + +#. Install the oauthlib python module +#. Install the module like any other in Odoo +#. For the token retrieval to work on a multi-database instance, you should add this module in the server_wide_modules list + +Configuration +============= + +This module requires you to configure two things : + +#. The scopes are used to define restricted data access +#. The clients are used to declare applications that will be allowed to request tokens and data + +To configure scopes, you need to: + +#. Go to Settings > Users > OAuth Provider Scopes +#. Create some scopes: + + - The scope name and description will be displayed to the user on the authorization page. + - The code is the value provided by the OAuth clients to request access to the scope. + - The model defines which model the scope is linked to (access to user data, partners, sales orders, etc.). + - The filter allows you to determine which records will be accessible through this scope. No filter means all records of the model are accessible. + - The field names allows you to define which fields will be provided to the clients. An empty list only returns the id of accessible records. + +To configure clients, you need to: + +#. Go to Settings > Users > OAuth Provider Clients +#. Create at least one client: + + - The name will be displayed to the user on the authorization page. + - The client identifier is the value provided by the OAuth clients to request authorizations/tokens. + - The application type adapts the process to four pre-defined profiles: + + - Web Application : Authorization Code Grant + - Mobile Application : Implicit Grant + - Legacy Application : Resource Owner Password Credentials Grant + - Backend Application : User Credentials Grant (not implemented yet) + + - The skip authorization checkbox allows the client to skip the authorization page, and directly deliver a token without prompting the user (useful when the application is trusted). + - The allowed scopes list defines which data will be accessible by this client applicaton. + - The allowed redirect URIs must match the URI sent by the client, to avoid redirecting users to an unauthorized service. The first value in the list is the default redirect URI. + +For example, to configure an Odoo's *auth_oauth* module compatible client, you will enter these values : + +- Name : Anything you want +- Client identifier : The identifier you want to give to this client +- Application Type : Mobile Application (Odoo uses the implicit grant mode, which corresponds to the mobile application profile) +- Allowed Scopes : Nothing required, but allowing access to current user's email and name is used by Odoo to fill user's information on signup +- Allowed Redirect URIs : http://odoo.example.com/auth_oauth/signin + +Usage +===== + +This module will allow OAuth clients to use your Odoo instance as an OAuth provider. + +Once configured, you must give these information to your client application : + +#. Client identifier : Identifies the application (to be able to check allowed scopes and redirect URIs) +#. Allowed scopes : The codes of scopes allowed for this client +#. URLs for the requests : + + - Authorization request : http://odoo.example.com/oauth2/authorize + - Token request : http://odoo.example.com/oauth2/token + - Token information request : http://odoo.example.com/oauth2/tokeninfo + Parameters : access_token + - User information request : http://odoo.example.com/oauth2/userinfo + Parameters : access_token + - Any other model information request (depending on the scopes) : http://odoo.example.com/oauth2/otherinfo + Parameters : access_token and model + +For example, to configure the *auth_oauth* Odoo module as a client, you will enter these values : + +- Provider name : Anything you want +- Client ID : The identifier of the client configured in your Odoo Provider instance +- Body : Text displayed on Odoo's login page link +- Authentication URL : http://odoo.example.com/oauth2/authorize +- Scope : A space separated list of scope codes allowed to the client in your Odoo Provider instance +- Validation URL : http://odoo.example.com/oauth2/tokeninfo +- Data URL : http://odoo.example.com/oauth2/userinfo + +.. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas + :alt: Try me on Runbot + :target: https://runbot.odoo-community.org/runbot/149/9.0 + +Known issues / Roadmap +====================== + +* Implement the backend application profile (client credentials grant type) +* Add checkboxes on the authorization page to allow the user to disable some scopes for a token ? (I don't know if this is allowed in the OAuth protocol) + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues +`_. In case of trouble, please +check there if your issue has already been reported. If you spotted it first, +help us smashing it by providing a detailed and welcomed feedback. + +Credits +======= + +Images +------ + +* Odoo Community Association: `Icon `_. + +Contributors +------------ + +* Sylvain Garancher + +Maintainer +---------- + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +This module is maintained by the OCA. + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +To contribute to this module, please visit https://odoo-community.org. diff --git a/oauth_provider/__init__.py b/oauth_provider/__init__.py new file mode 100644 index 000000000..d792dec17 --- /dev/null +++ b/oauth_provider/__init__.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from . import controllers +from . import models + +import uuid + + +def pre_init_hook(cr): + """ Initialize oauth_identifier on res.users + + The standard initialization puts the same value for every existing record, + which is invalid for this field. + This is done in the pre_init_hook to be able to add the unique constrait + on the first run, when installing the module. + """ + cr.execute('ALTER TABLE res_users ADD COLUMN oauth_identifier varchar') + cr.execute('SELECT id FROM res_users') + for user_id in cr.fetchall(): + cr.execute( + 'UPDATE res_users SET oauth_identifier = %s WHERE id = %s', + (str(uuid.uuid4()), user_id)) diff --git a/oauth_provider/__openerp__.py b/oauth_provider/__openerp__.py new file mode 100644 index 000000000..adfc56875 --- /dev/null +++ b/oauth_provider/__openerp__.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +{ + 'name': 'OAuth Provider', + 'summary': 'Allows to use Odoo as an OAuth2 provider', + 'version': '9.0.1.0.0', + 'category': 'Authentication', + 'website': 'http://www.syleam.fr/', + 'author': 'SYLEAM, Odoo Community Association (OCA)', + 'license': 'AGPL-3', + 'installable': True, + 'external_dependancies': { + 'python': ['oauthlib'], + }, + 'depends': [ + 'base', + ], + 'data': [ + 'security/oauth_provider_security.xml', + 'security/ir.model.access.csv', + 'views/oauth_provider_client.xml', + 'views/oauth_provider_scope.xml', + 'templates/authorization.xml', + ], + 'pre_init_hook': 'pre_init_hook', +} diff --git a/oauth_provider/controllers/__init__.py b/oauth_provider/controllers/__init__.py new file mode 100644 index 000000000..810488da0 --- /dev/null +++ b/oauth_provider/controllers/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from . import main diff --git a/oauth_provider/controllers/main.py b/oauth_provider/controllers/main.py new file mode 100644 index 000000000..96ffab39d --- /dev/null +++ b/oauth_provider/controllers/main.py @@ -0,0 +1,290 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import json +import logging +import werkzeug.utils +import werkzeug.wrappers +from datetime import datetime +from openerp import http, fields +from openerp.addons.web.controllers.main import ensure_db + +_logger = logging.getLogger(__name__) + +try: + import oauthlib + from oauthlib import oauth2 +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class OAuth2ProviderController(http.Controller): + def __init__(self): + super(OAuth2ProviderController, self).__init__() + + def _get_request_information(self): + """ Retrieve needed arguments for oauthlib methods """ + uri = http.request.httprequest.base_url + http_method = http.request.httprequest.method + body = oauthlib.common.urlencode( + http.request.httprequest.values.items()) + headers = http.request.httprequest.headers + + return uri, http_method, body, headers + + def _check_access_token(self, access_token): + """ Check if the provided access token is valid """ + token = http.request.env['oauth.provider.token'].search([ + ('token', '=', access_token), + ]) + if not token: + return False + + oauth2_server = token.client_id.get_oauth2_server() + # Retrieve needed arguments for oauthlib methods + uri, http_method, body, headers = self._get_request_information() + + # Validate request information + valid, oauthlib_request = oauth2_server.verify_request( + uri, http_method=http_method, body=body, headers=headers) + + if valid: + return token + + return False + + def _json_response(self, data=None, status=200, headers=None): + """ Returns a json response to the client """ + if headers is None: + headers = {'Content-Type': 'application/json'} + + return werkzeug.wrappers.BaseResponse( + json.dumps(data), status=status, headers=headers) + + @http.route('/oauth2/authorize', type='http', auth='user', methods=['GET']) + def authorize(self, client_id=None, response_type=None, redirect_uri=None, + scope=None, state=None, *args, **kwargs): + """ Check client's request, and display an authorization page to the user, + + The authorization page lists allowed scopes + If the client is configured to skip the authorization page, directly + redirects to the requested URI + """ + client = http.request.env['oauth.provider.client'].search([ + ('identifier', '=', client_id), + ]) + if not client: + return http.request.render( + 'oauth_provider.authorization_error', { + 'title': 'Unknown Client Identifier!', + 'message': 'This client identifier is invalid.', + }) + oauth2_server = client.get_oauth2_server() + + # Retrieve needed arguments for oauthlib methods + uri, http_method, body, headers = self._get_request_information() + try: + scopes, credentials = oauth2_server.validate_authorization_request( + uri, http_method=http_method, body=body, headers=headers) + # Store only some values, because the pickling of the full request + # object is not possible + http.request.httpsession['oauth_scopes'] = scopes + http.request.httpsession['oauth_credentials'] = { + 'client_id': credentials['client_id'], + 'redirect_uri': credentials['redirect_uri'], + 'response_type': credentials['response_type'], + 'state': credentials['state'], + } + if client.skip_authorization: + # Skip the authorization page + # Useful when the application is trusted + return self.authorize_post() + except oauth2.FatalClientError as e: + return http.request.render( + 'oauth_provider.authorization_error', { + 'title': 'Error: {error}'.format(error=e.error), + 'message': e.description, + }) + except oauth2.OAuth2Error as e: + return http.request.render( + 'oauth_provider.authorization_error', { + 'title': 'Error: {error}'.format(error=e.error), + 'message': 'An unknown error occured! Please contact your ' + 'administrator', + }) + + oauth_scopes = client.scope_ids.filtered( + lambda record: record.code in scopes) + return http.request.render( + 'oauth_provider.authorization', { + 'oauth_client': client.name, + 'oauth_scopes': oauth_scopes, + }) + + @http.route( + '/oauth2/authorize', type='http', auth='user', methods=['POST']) + def authorize_post(self, *args, **kwargs): + """ Redirect to the requested URI during the authorization """ + client = http.request.env['oauth.provider.client'].search([ + ('identifier', '=', http.request.httpsession.get( + 'oauth_credentials', {}).get('client_id'))]) + if not client: + return http.request.render( + 'oauth_provider.authorization_error', { + 'title': 'Unknown Client Identifier!', + 'message': 'This client identifier is invalid.', + }) + oauth2_server = client.get_oauth2_server() + + # Retrieve needed arguments for oauthlib methods + uri, http_method, body, headers = self._get_request_information() + scopes = http.request.httpsession['oauth_scopes'] + credentials = http.request.httpsession['oauth_credentials'] + headers, body, status = oauth2_server.create_authorization_response( + uri, http_method=http_method, body=body, headers=headers, + scopes=scopes, credentials=credentials) + + return werkzeug.utils.redirect(headers['Location'], code=status) + + @http.route('/oauth2/token', type='http', auth='none', methods=['POST'], + csrf=False) + def token(self, client_id=None, client_secret=None, redirect_uri=None, + scope=None, code=None, grant_type=None, username=None, + password=None, refresh_token=None, *args, **kwargs): + """ Return a token corresponding to the supplied information + + Not all parameters are required, depending on the application type + """ + ensure_db() + + # If no client_id is specified, get it from session + if client_id is None: + client_id = http.request.httpsession.get( + 'oauth_credentials', {}).get('client_id') + + client = http.request.env['oauth.provider.client'].sudo().search([ + ('identifier', '=', client_id), + ]) + + if not client: + return self._json_response( + data={'error': 'invalid_client_id'}, status=401) + oauth2_server = client.get_oauth2_server() + + # Retrieve needed arguments for oauthlib methods + uri, http_method, body, headers = self._get_request_information() + credentials = {'scope': scope} + + # Retrieve the authorization code, if any, to get Odoo's user id + existing_code = http.request.env[ + 'oauth.provider.authorization.code'].search([ + ('client_id.identifier', '=', client_id), + ('code', '=', code), + ]) + if existing_code: + credentials['odoo_user_id'] = existing_code.user_id.id + # Retrieve the existing token, if any, to get Odoo's user id + existing_token = http.request.env['oauth.provider.token'].search([ + ('client_id.identifier', '=', client_id), + ('refresh_token', '=', refresh_token), + ]) + if existing_token: + credentials['odoo_user_id'] = existing_token.user_id.id + + headers, body, status = oauth2_server.create_token_response( + uri, http_method=http_method, body=body, headers=headers, + credentials=credentials) + + return werkzeug.wrappers.BaseResponse( + body, status=status, headers=headers) + + @http.route('/oauth2/tokeninfo', type='http', auth='none', methods=['GET']) + def tokeninfo(self, access_token=None, *args, **kwargs): + """ Return some information about the supplied token + + Similar to Google's "tokeninfo" request + """ + ensure_db() + token = self._check_access_token(access_token) + if not token: + return self._json_response( + data={'error': 'invalid_or_expired_token'}, status=401) + + token_lifetime = (fields.Datetime.from_string(token.expires_at) - + datetime.now()).seconds + # Base data to return + data = { + 'audience': token.client_id.identifier, + 'scopes': ' '.join(token.scope_ids.mapped('code')), + 'expires_in': token_lifetime, + } + + # Add the oauth user identifier, if user's information access is + # allowed by the token's scopes + user_data = token.get_data_for_model( + 'res.users', res_id=token.user_id.id) + if 'id' in user_data: + data.update(user_id=token.generate_user_id()) + return self._json_response(data=data) + + @http.route('/oauth2/userinfo', type='http', auth='none', methods=['GET']) + def userinfo(self, access_token=None, *args, **kwargs): + """ Return some information about the user linked to the supplied token + + Similar to Google's "userinfo" request + """ + ensure_db() + token = self._check_access_token(access_token) + if not token: + return self._json_response( + data={'error': 'invalid_or_expired_token'}, status=401) + + data = token.get_data_for_model('res.users', res_id=token.user_id.id) + return self._json_response(data=data) + + @http.route('/oauth2/otherinfo', type='http', auth='none', methods=['GET']) + def otherinfo(self, access_token=None, model=None, *args, **kwargs): + """ Return allowed information about the requested model """ + ensure_db() + token = self._check_access_token(access_token) + if not token: + return self._json_response( + data={'error': 'invalid_or_expired_token'}, status=401) + + model_obj = http.request.env['ir.model'].search([ + ('model', '=', model), + ]) + if not model_obj: + return self._json_response( + data={'error': 'invalid_model'}, status=400) + + data = token.get_data_for_model(model) + return self._json_response(data=data) + + @http.route( + '/oauth2/revoke_token', type='http', auth='none', methods=['POST']) + def revoke_token(self, token=None, *args, **kwargs): + """ Revoke the supplied token """ + ensure_db() + body = oauthlib.common.urlencode( + http.request.httprequest.values.items()) + db_token = http.request.env['oauth.provider.token'].search([ + ('token', '=', token), + ]) + if not db_token: + db_token = http.request.env['oauth.provider.token'].search([ + ('refresh_token', '=', token), + ]) + if not db_token: + return self._json_response( + data={'error': 'invalid_or_expired_token'}, status=401) + oauth2_server = db_token.client_id.get_oauth2_server() + + # Retrieve needed arguments for oauthlib methods + uri, http_method, body, headers = self._get_request_information() + + headers, body, status = oauth2_server.create_revocation_response( + uri, http_method=http_method, body=body, headers=headers) + return werkzeug.wrappers.BaseResponse( + body, status=status, headers=headers) diff --git a/oauth_provider/models/__init__.py b/oauth_provider/models/__init__.py new file mode 100644 index 000000000..1d9e69ce6 --- /dev/null +++ b/oauth_provider/models/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from . import oauth_provider_authorization_code +from . import oauth_provider_redirect_uri +from . import oauth_provider_scope +from . import oauth_provider_token +from . import oauth_provider_client +from . import res_users diff --git a/oauth_provider/models/oauth_provider_authorization_code.py b/oauth_provider/models/oauth_provider_authorization_code.py new file mode 100644 index 000000000..b0f15bf65 --- /dev/null +++ b/oauth_provider/models/oauth_provider_authorization_code.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from openerp import models, fields + + +class OAuthProviderAuthorizationCode(models.Model): + _name = 'oauth.provider.authorization.code' + _description = 'OAuth Provider Authorization Code' + _rec_name = 'code' + + code = fields.Char(required=True, help='Name of the authorization code.') + client_id = fields.Many2one( + comodel_name='oauth.provider.client', string='Client', required=True, + help='Client associated to this authorization code.') + user_id = fields.Many2one( + comodel_name='res.users', string='User', required=True, + help='User associated to this authorization code.') + redirect_uri_id = fields.Many2one( + comodel_name='oauth.provider.redirect.uri', string='Redirect URI', + required=True, + help='Redirect URI associated to this authorization code.') + scope_ids = fields.Many2many( + comodel_name='oauth.provider.scope', string='Scopes', + help='Scopes allowed by this authorization code.') + active = fields.Boolean( + default=True, help='When unchecked, the code is invalidated.') + + _sql_constraints = [ + ('code_client_id_unique', 'UNIQUE (code, client_id)', + 'The authorization code must be unique per client !'), + ] diff --git a/oauth_provider/models/oauth_provider_client.py b/oauth_provider/models/oauth_provider_client.py new file mode 100644 index 000000000..cff848497 --- /dev/null +++ b/oauth_provider/models/oauth_provider_client.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import hashlib +import uuid +import logging +from openerp import models, api, fields +from ..oauth2.validator import OdooValidator + +_logger = logging.getLogger(__name__) + +try: + from oauthlib import oauth2 +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class OAuthProviderClient(models.Model): + _name = 'oauth.provider.client' + _description = 'OAuth Provider Client' + + name = fields.Char(required=True, help='Name of this client.') + identifier = fields.Char( + string='Client Identifier', required=True, readonly=True, + default=lambda self: str(uuid.uuid4()), copy=False, + help='Unique identifier of the client.') + secret = fields.Char( + help='Optional secret used to authenticate the client.') + skip_authorization = fields.Boolean( + help='Check this box if the user shouldn\'t be prompted to authorize ' + 'or not the requested scopes.') + application_type = fields.Selection( + selection=[ + ('web application', 'Web Application'), + ('mobile application', 'Mobile Application'), + ('legacy application', 'Legacy Application'), + ('backend application', 'Backend Application (not implemented)'), + ], required=True, default='web application', + help='Application type to be used with this client.') + grant_type = fields.Selection( + selection=[ + ('authorization_code', 'Authorization Code'), + ('implicit', 'Implicit'), + ('password', 'Password'), + ('client_credentials', 'Client Credentials'), + ], string='OAuth Grant Type', + compute='_compute_grant_response_type', store=True, + help='Grant type used by the client for OAuth.') + response_type = fields.Selection( + selection=[ + ('code', 'Authorization Code'), + ('token', 'Token'), + ('none', 'None'), + ], string='OAuth Response Type', + compute='_compute_grant_response_type', store=True, + help='Response type used by the client for OAuth.') + token_type = fields.Selection( + selection=[('random', 'Randomly generated')], + required=True, default='random', + help='Type of token to return. The base module only provides randomly ' + 'generated tokens.') + scope_ids = fields.Many2many( + comodel_name='oauth.provider.scope', string='Allowed Scopes', + help='List of scopes the client is allowed to access.') + redirect_uri_ids = fields.One2many( + comodel_name='oauth.provider.redirect.uri', inverse_name='client_id', + string='OAuth Redirect URIs', + help='Allowed redirect URIs for the client.') + + _sql_constraints = [ + ('identifier_unique', 'UNIQUE (identifier)', + 'The identifier of the client must be unique !'), + ] + + @api.model + def application_type_mapping(self): + return { + 'web application': ('authorization_code', 'code'), + 'mobile application': ('implicit', 'token'), + 'legacy application': ('password', 'none'), + 'backend application': ('client_credentials', 'none'), + } + + @api.multi + @api.depends('application_type') + def _compute_grant_response_type(self): + applications = self.application_type_mapping() + for client in self: + client.grant_type, client.response_type = applications[ + client.application_type] + + @api.multi + def get_oauth2_server(self, validator=None, **kwargs): + """ Returns an OAuth2 server instance, depending on the client application type + + Generates an OdooValidator instance if no custom validator is defined + All other arguments are directly passed to the server constructor (for + example, a token generator function) + """ + self.ensure_one() + + if validator is None: + validator = OdooValidator() + + if self.application_type == 'web application': + return oauth2.WebApplicationServer(validator, **kwargs) + elif self.application_type == 'mobile application': + return oauth2.MobileApplicationServer(validator, **kwargs) + elif self.application_type == 'legacy application': + return oauth2.LegacyApplicationServer(validator, **kwargs) + elif self.application_type == 'backend application': + return oauth2.BackendApplicationServer(validator, **kwargs) + + @api.multi + def generate_user_id(self, user): + """ Generates a unique user identifier for this client + + Include the client and user identifiers in the final identifier to + generate a different identifier for the same user, depending on the + client accessing this user. By doing this, clients cannot find a list + of common users by comparing their identifiers list from tokeninfo. + """ + self.ensure_one() + + user_identifier = self.identifier \ + + user.sudo().oauth_identifier + + # Use a sha256 to avoid a too long final string + return hashlib.sha256(user_identifier).hexdigest() diff --git a/oauth_provider/models/oauth_provider_redirect_uri.py b/oauth_provider/models/oauth_provider_redirect_uri.py new file mode 100644 index 000000000..72ddca170 --- /dev/null +++ b/oauth_provider/models/oauth_provider_redirect_uri.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from openerp import models, fields + + +class OAuthProviderRedirectURI(models.Model): + _name = 'oauth.provider.redirect.uri' + _description = 'OAuth Provider Redirect URI' + + name = fields.Char(required=True, help='URI of the redirect.') + sequence = fields.Integer( + required=True, default=10, help='Order of the redirect URIs.') + client_id = fields.Many2one( + comodel_name='oauth.provider.client', string='Client', required=True, + help='Client allowed to redirect using this URI.') diff --git a/oauth_provider/models/oauth_provider_scope.py b/oauth_provider/models/oauth_provider_scope.py new file mode 100644 index 000000000..86562a894 --- /dev/null +++ b/oauth_provider/models/oauth_provider_scope.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import datetime +import dateutil +import time +from collections import defaultdict +from openerp import models, api, fields +from openerp.tools.safe_eval import safe_eval + + +class OAuthProviderScope(models.Model): + _name = 'oauth.provider.scope' + _description = 'OAuth Provider Scope' + + name = fields.Char( + required=True, translate=True, + help='Name of the scope, displayed to the user.') + code = fields.Char( + required=True, help='Code of the scope, used in OAuth requests.') + description = fields.Text( + required=True, translate=True, + help='Description of the scope, displayed to the user.') + model_id = fields.Many2one( + comodel_name='ir.model', string='Model', required=True, + help='Model allowed to be accessed by this scope.') + model = fields.Char( + related='model_id.model', string='Model Name', readonly=True, + help='Name of the model allowed to be accessed by this scope.') + filter_id = fields.Many2one( + comodel_name='ir.filters', string='Filter', + domain="[('model_id', '=', model)]", + help='Filter applied to retrieve records allowed by this scope.') + field_ids = fields.Many2many( + comodel_name='ir.model.fields', string='Fields', + domain="[('model_id', '=', model_id)]", + help='Fields allowed by this scope.') + + _sql_constraints = [ + ('code_unique', 'UNIQUE (code)', + 'The code of the scopes must be unique !'), + ] + + @api.model + def _get_ir_filter_eval_context(self): + """ Returns the base eval context for ir.filter domains evaluation """ + return { + 'datetime': datetime, + 'dateutil': dateutil, + 'time': time, + 'uid': self.env.uid, + 'user': self.env.user, + } + + @api.multi + def get_data_for_model(self, model, res_id=None, all_scopes_match=False): + """ Return the data matching the scopes from the requested model """ + data = defaultdict(dict) + eval_context = self._get_ir_filter_eval_context() + all_scopes_records = self.env[model] + for scope in self.filtered(lambda record: record.model == model): + # Retrieve the scope's domain + filter_domain = [(1, '=', 1)] + if scope.filter_id: + filter_domain = safe_eval( + scope.filter_id.sudo().domain, eval_context) + if res_id is not None: + filter_domain.append(('id', '=', res_id)) + + # Retrieve data of the matching records, depending on the scope's + # fields + records = self.env[model].search(filter_domain) + for record_data in records.read(scope.field_ids.mapped('name')): + for field, value in record_data.items(): + if isinstance(value, tuple): + # Return only the name for a many2one + data[record_data['id']][field] = value[1] + else: + data[record_data['id']][field] = value + + # Keep a list of records that match all scopes + if not all_scopes_records: + all_scopes_records = records + else: + all_scopes_records &= records + + # If all scopes are required to match, filter the results to keep only + # those mathing all scopes + if all_scopes_match: + data = dict(filter( + lambda record_data: record_data[0] in all_scopes_records.ids, + data.items())) + + # If a single record was requested, return only data coming from this + # record + # Return an empty dictionnary if this record didn't recieve data to + # return + if res_id is not None: + data = data.get(res_id, {}) + + return data diff --git a/oauth_provider/models/oauth_provider_token.py b/oauth_provider/models/oauth_provider_token.py new file mode 100644 index 000000000..102c95254 --- /dev/null +++ b/oauth_provider/models/oauth_provider_token.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from openerp import models, api, fields, exceptions, _ + + +class OAuthProviderToken(models.Model): + _name = 'oauth.provider.token' + _description = 'OAuth Provider Token' + _rec_name = 'token' + + token = fields.Char(required=True, help='The token itself.') + token_type = fields.Selection( + selection=[('Bearer', 'Bearer')], required=True, default='Bearer', + help='Type of token stored. Currently, only the bearer token type is ' + 'available.') + refresh_token = fields.Char( + help='The refresh token, if applicable.') + client_id = fields.Many2one( + comodel_name='oauth.provider.client', string='Client', required=True, + help='Client associated to this token.') + user_id = fields.Many2one( + comodel_name='res.users', string='User', required=True, + help='User associated to this token.') + scope_ids = fields.Many2many( + comodel_name='oauth.provider.scope', string='Scopes', + help='Scopes allowed by this token.') + expires_at = fields.Datetime( + required=True, help='Expiration time of the token.') + active = fields.Boolean( + compute='_compute_active', search='_search_active', + help='A token is active only if it has not yet expired.') + + _sql_constraints = [ + ('token_unique', 'UNIQUE (token, client_id)', + 'The token must be unique per client !'), + ('refresh_token_unique', 'UNIQUE (refresh_token, client_id)', + 'The refresh token must be unique per client !'), + ] + + @api.multi + def _compute_active(self): + for token in self: + token.active = fields.Datetime.now() < token.expires_at + + @api.model + def _search_active(self, operator, operand): + domain = [] + if operator == 'in': + if True in operand: + domain += self._search_active('=', True) + if False in operand: + domain += self._search_active('=', False) + if len(domain) > 1: + domain = [(1, '=', 1)] + elif operator == 'not in': + if True in operand: + domain += self._search_active('!=', True) + if False in operand: + domain += self._search_active('!=', False) + if len(domain) > 1: + domain = [(0, '=', 1)] + elif operator in ('=', '!='): + operators = { + ('=', True): '>', + ('=', False): '<=', + ('!=', False): '>', + ('!=', True): '<=', + } + domain = [('expires_at', operators[operator, operand], + fields.Datetime.now())] + else: + raise exceptions.UserError( + _('Invalid operator {operator} for field active!').format( + operator=operator)) + + return domain + + @api.multi + def generate_user_id(self): + """ Generates a unique user identifier for this token """ + self.ensure_one() + + return self.client_id.generate_user_id(self.user_id) + + @api.multi + def get_data_for_model(self, model, res_id=None, all_scopes_match=False): + """ Returns the data of the accessible records of the requested model, + + Data are returned depending on the allowed scopes for the token + If the all_scopes_match argument is set to True, return only records + allowed by all token's scopes + """ + self.ensure_one() + + # Retrieve records allowed from all scopes + return self.sudo(user=self.user_id).scope_ids.get_data_for_model( + model, res_id=res_id, all_scopes_match=all_scopes_match) diff --git a/oauth_provider/models/res_users.py b/oauth_provider/models/res_users.py new file mode 100644 index 000000000..85ece2e29 --- /dev/null +++ b/oauth_provider/models/res_users.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import uuid +from openerp import models, fields + + +class ResUsers(models.Model): + _inherit = 'res.users' + + oauth_identifier = fields.Char( + string='OAuth Identifier', required=True, readonly=True, + default=lambda self: str(uuid.uuid4()), copy=False, + help='String used to identify this user during an OAuth session.') + + _sql_constraints = [ + ('oauth_identifier_unique', 'UNIQUE (oauth_identifier)', + 'The OAuth identifier of the user must be unique !'), + ] diff --git a/oauth_provider/oauth2/__init__.py b/oauth_provider/oauth2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oauth_provider/oauth2/validator.py b/oauth_provider/oauth2/validator.py new file mode 100644 index 000000000..714418844 --- /dev/null +++ b/oauth_provider/oauth2/validator.py @@ -0,0 +1,256 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import base64 +import logging +from datetime import datetime, timedelta +from openerp import http +from openerp import fields + +_logger = logging.getLogger(__name__) + +try: + from oauthlib.oauth2 import RequestValidator +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class OdooValidator(RequestValidator): + """ OAuth2 validator to be used in Odoo + + This is an implementation of oauthlib's RequestValidator interface + https://github.com/idan/oauthlib/oauthlib/oauth2/rfc6749/request_validator.py + """ + def _load_client(self, request, client_id=None): + """ Returns a client instance for the request """ + client = request.client + if not client: + request.client = http.request.env['oauth.provider.client'].search([ + ('identifier', '=', client_id or request.client_id), + ]) + request.odoo_user = http.request.env.user + request.client.client_id = request.client.identifier + + def _extract_auth(self, request): + """ Extract auth string from request headers """ + auth = request.headers.get('Authorization', ' ') + auth_type, auth_string = auth.split(' ', 1) + if auth_type != 'Basic': + return '' + + return auth_string + + def authenticate_client(self, request, *args, **kwargs): + """ Authenticate the client """ + auth_string = self._extract_auth(request) + auth_string_decoded = base64.b64decode(auth_string) + + # If we don't have a proper auth string, get values in the request body + if ':' not in auth_string_decoded: + client_id = request.client_id + client_secret = request.client_secret + else: + client_id, client_secret = auth_string_decoded.split(':', 1) + + self._load_client(request) + return (request.client.identifier == client_id) and \ + (request.client.secret or '') == (client_secret or '') + + def authenticate_client_id(self, client_id, request, *args, **kwargs): + """ Ensure client_id belong to a non-confidential client """ + self._load_client(request, client_id=client_id) + return bool(request.client) and not request.client.secret + + def client_authentication_required(self, request, *args, **kwargs): + """ Determine if the client authentication is required for the request + """ + # If an auth string was specified, unconditionnally authenticate + if self._extract_auth(request): + return True + + self._load_client(request) + return request.client.grant_type in ( + 'password', + 'authorization_code', + 'refresh_token', + ) or request.client_secret or \ + not request.odoo_user.active + + def confirm_redirect_uri( + self, client_id, code, redirect_uri, client, *args, **kwargs): + """ Ensure that the authorization process' redirect URI + + The authorization process corresponding to the code must begin by using + this redirect_uri + """ + code = http.request.env['oauth.provider.authorization.code'].search([ + ('client_id.identifier', '=', client_id), + ('code', '=', code), + ]) + return redirect_uri == code.redirect_uri_id.name + + def get_default_redirect_uri(self, client_id, request, *args, **kwargs): + """ Returns the default redirect URI for the client """ + client = http.request.env['oauth.provider.client'].search([ + ('identifier', '=', client_id), + ]) + return client.redirect_uri_ids and client.redirect_uri_ids[0].name \ + or '' + + def get_default_scopes(self, client_id, request, *args, **kwargs): + """ Returns a list of default scoprs for the client """ + client = http.request.env['oauth.provider.client'].search([ + ('identifier', '=', client_id), + ]) + return ' '.join(client.scope_ids.mapped('code')) + + def get_original_scopes(self, refresh_token, request, *args, **kwargs): + """ Returns the list of scopes associated to the refresh token """ + token = http.request.env['oauth.provider.token'].search([ + ('client_id', '=', request.client.id), + ('refresh_token', '=', refresh_token), + ]) + return token.scope_ids.mapped('code') + + def invalidate_authorization_code( + self, client_id, code, request, *args, **kwargs): + """ Invalidates an authorization code """ + code = http.request.env['oauth.provider.authorization.code'].search([ + ('client_id.identifier', '=', client_id), + ('code', '=', code), + ]) + code.sudo().write({'active': False}) + + def is_within_original_scope( + self, request_scopes, refresh_token, request, *args, **kwargs): + """ Check if the requested scopes are within a scope of the token """ + token = http.request.env['oauth.provider.token'].search([ + ('client_id', '=', request.client.id), + ('refresh_token', '=', refresh_token), + ]) + return set(request_scopes).issubset( + set(token.scope_ids.mapped('code'))) + + def revoke_token(self, token, token_type_hint, request, *args, **kwargs): + """ Revoke an access of refresh token """ + db_token = http.request.env['oauth.provider.token'].search([ + ('token', '=', token), + ]) + # If we revoke a full token, simply unlink it + if db_token: + db_token.sudo().unlink() + # If we revoke a refresh token, empty it in the corresponding token + else: + db_token = http.request.env['oauth.provider.token'].search([ + ('refresh_token', '=', token), + ]) + db_token.sudo().refresh_token = False + + def rotate_refresh_token(self, request): + """ Determine if the refresh token has to be renewed + + Called after refreshing an access token + Always refresh the token by default, but child classes could override + this method to change this behaviour. + """ + return True + + def save_authorization_code( + self, client_id, code, request, *args, **kwargs): + """ Store the authorization code into the database """ + redirect_uri = http.request.env['oauth.provider.redirect.uri'].search([ + ('name', '=', request.redirect_uri), + ]) + http.request.env['oauth.provider.authorization.code'].sudo().create({ + 'code': code['code'], + 'client_id': request.client.id, + 'user_id': request.odoo_user.id, + 'redirect_uri_id': redirect_uri.id, + 'scope_ids': [(6, 0, request.client.scope_ids.filtered( + lambda record: record.code in request.scopes).ids)], + }) + + def save_bearer_token(self, token, request, *args, **kwargs): + """ Store the bearer token into the database """ + scopes = token.get('scope', '').split() + http.request.env['oauth.provider.token'].sudo().create({ + 'token': token['access_token'], + 'token_type': token['token_type'], + 'refresh_token': token.get('refresh_token'), + 'client_id': request.client.id, + 'user_id': token.get('odoo_user_id', request.odoo_user.id), + 'scope_ids': [(6, 0, request.client.scope_ids.filtered( + lambda record: record.code in scopes).ids)], + 'expires_at': fields.Datetime.to_string( + datetime.now() + timedelta(seconds=token['expires_in'])), + }) + return request.client.redirect_uri_ids[0].name + + def validate_bearer_token(self, token, scopes, request): + """ Ensure the supplied bearer token is valid, and allowed for the scopes + """ + token = http.request.env['oauth.provider.token'].search([ + ('token', '=', token), + ]) + if scopes is None: + scopes = '' + + return set(scopes.split()).issubset( + set(token.scope_ids.mapped('code'))) + + def validate_client_id(self, client_id, request, *args, **kwargs): + """ Ensure client_id belong to a valid and active client """ + self._load_client(request) + return bool(request.client) + + def validate_code(self, client_id, code, client, request, *args, **kwargs): + """ Check that the code is valid, and assigned to the given client """ + code = http.request.env['oauth.provider.authorization.code'].search([ + ('client_id.identifier', '=', client_id), + ('code', '=', code), + ]) + request.odoo_user = code.user_id + return bool(code) + + def validate_grant_type( + self, client_id, grant_type, client, request, *args, **kwargs): + """ Ensure the client is authorized to use the requested grant_type """ + return client.identifier == client_id and grant_type in ( + client.grant_type, 'refresh_token' + ) + + def validate_redirect_uri( + self, client_id, redirect_uri, request, *args, **kwargs): + """ Ensure the client is allowed to use the requested redurect_uri """ + return request.client.identifier == client_id and \ + redirect_uri in request.client.mapped('redirect_uri_ids.name') + + def validate_refresh_token( + self, refresh_token, client, request, *args, **kwargs): + """ Ensure the refresh token is valid and associated to the client """ + token = http.request.env['oauth.provider.token'].search([ + ('client_id', '=', client.id), + ('refresh_token', '=', refresh_token), + ]) + return bool(token) + + def validate_response_type( + self, client_id, response_type, client, request, *args, **kwargs): + """ Ensure the client is allowed to use the requested response_type """ + return request.client.identifier == client_id and \ + response_type == request.client.response_type + + def validate_scopes( + self, client_id, scopes, client, request, *args, **kwargs): + """ Ensure the client is allowed to access all requested scopes """ + return request.client.identifier == client_id and set(scopes).issubset( + set(request.client.mapped('scope_ids.code'))) + + def validate_user( + self, username, password, client, request, *args, **kwargs): + """ Ensure the usernamd and password are valid """ + uid = http.request.session.authenticate( + http.request.session.db, username, password) + request.odoo_user = http.request.env['res.users'].browse(uid) + return bool(uid) diff --git a/oauth_provider/security/ir.model.access.csv b/oauth_provider/security/ir.model.access.csv new file mode 100644 index 000000000..678538444 --- /dev/null +++ b/oauth_provider/security/ir.model.access.csv @@ -0,0 +1,9 @@ +"id","name","model_id:id","group_id:id","perm_read","perm_write","perm_create","perm_unlink" +"oauth_provider_client_all_users","OAuth Client All Users","model_oauth_provider_client",,1,0,0,0 +"oauth_provider_client_manager","OAuth Client Manager","model_oauth_provider_client",group_oauth_provider_manager,1,1,1,1 +"oauth_provider_scope_all_users","OAuth Scope All Users","model_oauth_provider_scope",,1,0,0,0 +"oauth_provider_scope_manager","OAuth Scope Manager","model_oauth_provider_scope",group_oauth_provider_manager,1,1,1,1 +"oauth_provider_redirect_uri_all_users","OAuth Redirect URI All Users","model_oauth_provider_redirect_uri",,1,0,0,0 +"oauth_provider_redirect_uri_manager","OAuth Redirect URI Manager","model_oauth_provider_redirect_uri",group_oauth_provider_manager,1,1,1,1 +"oauth_provider_authorization_code_all_users","OAuth Authorization Code All Users","model_oauth_provider_authorization_code",,1,0,0,0 +"oauth_provider_token_all_users","OAuth Token All Users","model_oauth_provider_token",,1,0,0,0 diff --git a/oauth_provider/security/oauth_provider_security.xml b/oauth_provider/security/oauth_provider_security.xml new file mode 100644 index 000000000..9401e2977 --- /dev/null +++ b/oauth_provider/security/oauth_provider_security.xml @@ -0,0 +1,32 @@ + + + + + OAuth Provider Manager + + + + Authorization Code access restricted to current user + [('user_id', '=', uid)] + + + + + + + + + + Token access restricted to current user + [('user_id', '=', uid)] + + + + + + + + diff --git a/oauth_provider/templates/authorization.xml b/oauth_provider/templates/authorization.xml new file mode 100644 index 000000000..c7db5c8b1 --- /dev/null +++ b/oauth_provider/templates/authorization.xml @@ -0,0 +1,39 @@ + + + + + + diff --git a/oauth_provider/tests/__init__.py b/oauth_provider/tests/__init__.py new file mode 100644 index 000000000..5e364d169 --- /dev/null +++ b/oauth_provider/tests/__init__.py @@ -0,0 +1,10 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from . import test_oauth_provider_client +from . import test_oauth_provider_token +from . import test_oauth_provider_scope +from . import test_oauth_provider_controller_web_application +from . import test_oauth_provider_controller_mobile_application +from . import test_oauth_provider_controller_legacy_application diff --git a/oauth_provider/tests/common_test_controller.py b/oauth_provider/tests/common_test_controller.py new file mode 100644 index 000000000..6fe90e824 --- /dev/null +++ b/oauth_provider/tests/common_test_controller.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import mock +import logging +from datetime import datetime, timedelta +from werkzeug.test import Client +from werkzeug.wrappers import BaseResponse +from openerp import fields +from openerp.service import wsgi_server +from openerp.tests.common import TransactionCase +from openerp.tools.misc import consteq + +_logger = logging.getLogger(__name__) + + +class OAuthProviderControllerTransactionCase(TransactionCase): + def setUp(self, application_type): + super(OAuthProviderControllerTransactionCase, self).setUp() + + # Initialize controller test stuff + self.werkzeug_environ = { + 'REMOTE_ADDR': '127.0.0.1', + } + self.user = self.env.ref('base.user_demo') + self.logged_user = False + self.initialize_test_client() + + # Initialize common stuff + self.redirect_uri_base = 'http://example.com' + self.filter = self.env['ir.filters'].create({ + 'name': 'Current user', + 'model_id': 'res.users', + 'domain': "[('id', '=', uid)]", + }) + self.client = self.env['oauth.provider.client'].create({ + 'name': 'Client', + 'identifier': 'client', + 'application_type': application_type, + 'redirect_uri_ids': [(0, 0, {'name': self.redirect_uri_base})], + 'scope_ids': [(0, 0, { + 'name': 'Email', + 'code': 'email', + 'description': 'Access to your email address.', + 'model_id': self.env.ref('base.model_res_users').id, + 'filter_id': self.filter.id, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_users_email').id]), + ], + }), (0, 0, { + 'name': 'Profile', + 'code': 'profile', + 'description': 'Access to your profile details (name, etc.)', + 'model_id': self.env.ref('base.model_res_users').id, + 'filter_id': self.filter.id, + 'field_ids': [ + (6, 0, [ + self.env.ref('base.field_res_users_name').id, + self.env.ref('base.field_res_users_city').id, + ]), + ], + })], + }) + + def initialize_test_client(self): + # Instantiate a test client + self.test_client = Client(wsgi_server.application, BaseResponse) + # Select the database + self.get_request('/web', data={'db': self.env.cr.dbname}) + + def login(self, username, password): + # Login as demo user + self.post_request('/web/login', data={ + 'login': username, + 'password': password, + }) + self.logged_user = self.env['res.users'].search([ + ('login', '=', username)]) + + def logout(self): + # Login as demo user + self.get_request('/web/session/logout') + self.logged_user = False + + @mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock) + def get_request(self, uri, request_env, data=None, headers=None): + """ Execute a GET request on the test client """ + # Mock the http request's environ to allow it to see test records + user = self.logged_user or self.env.ref('base.public_user') + request_env.return_value = self.env(user=user) + + return self.test_client.get( + uri, query_string=data, environ_base=self.werkzeug_environ, + headers=headers) + + @mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock) + @mock.patch('openerp.http.WebRequest.validate_csrf') + def post_request( + self, uri, validate_csrf, request_env, data=None, headers=None): + """ Execute a POST request on the test client """ + # Mock the http request's environ to allow it to see test records + user = self.logged_user or self.env.ref('base.public_user') + request_env.return_value = self.env(user=user) + # Disable CSRF tokens check during tests + validate_csrf.return_value = consteq('', '') + + return self.test_client.post( + uri, data=data, environ_base=self.werkzeug_environ, + headers=headers) + + def new_token(self): + return self.env['oauth.provider.token'].create({ + 'token': 'token', + 'token_type': 'Bearer', + 'refresh_token': 'refresh token', + 'client_id': self.client.id, + 'user_id': self.user.id, + 'expires_at': fields.Datetime.to_string( + datetime.now() + timedelta(seconds=3600)), + }) diff --git a/oauth_provider/tests/common_test_oauth_provider_controller.py b/oauth_provider/tests/common_test_oauth_provider_controller.py new file mode 100644 index 000000000..fdf9b18f5 --- /dev/null +++ b/oauth_provider/tests/common_test_oauth_provider_controller.py @@ -0,0 +1,419 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import hashlib +import json +import mock +import logging +from datetime import datetime +from openerp import fields + +_logger = logging.getLogger(__name__) + + +class TestOAuthProviderAurhorizeController(object): + def test_authorize_error_missing_arguments(self): + """ Call /oauth2/authorize without any argument + + Must return an unknown client identifier error + """ + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize') + self.assertEqual(response.status_code, 200) + self.assertTrue('Unknown Client Identifier!' in response.data) + self.assertTrue('This client identifier is invalid.' in response.data) + + def test_authorize_error_invalid_request(self): + """ Call /oauth2/authorize with only the client_id argument + + Must return an invalid_request error + """ + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + }) + self.assertEqual(response.status_code, 200) + self.assertTrue('Error: invalid_request' in response.data) + self.assertTrue('An unknown error occured! Please contact your ' + 'administrator' in response.data) + + def test_authorize_error_unsupported_response_type(self): + """ Call /oauth2/authorize with an unsupported response type + + Must return an unsupported_response_type error + """ + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': 'wrong response type', + }) + self.assertEqual(response.status_code, 200) + self.assertTrue('Error: unsupported_response_type' in response.data) + self.assertTrue('An unknown error occured! Please contact your ' + 'administrator' in response.data) + + def test_authorize_error_wrong_scopes(self): + """ Call /oauth2/authorize with wrong scopes + + Must return an invalid_scope error + """ + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'scope': 'wrong scope', + }) + self.assertEqual(response.status_code, 200) + self.assertTrue('Error: invalid_scope' in response.data) + self.assertTrue('An unknown error occured! Please contact your ' + 'administrator' in response.data) + + def test_authorize_error_wrong_uri(self): + """ Call the authorize method with a wrong redirect_uri + + Must return an invalid_request error + """ + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': 'http://wrong.redirect.uri', + }) + self.assertEqual(response.status_code, 200) + self.assertTrue('Error: invalid_request' in response.data) + self.assertTrue('Mismatching redirect URI' in response.data) + + def test_authorize_error_missing_uri(self): + """ Call /oauth2/authorize without any configured redirect URI + + Must return an invalid_request error + """ + self.client.redirect_uri_ids.unlink() + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'scope': self.client.scope_ids[0].code, + }) + self.assertEqual(response.status_code, 200) + self.assertTrue('Error: invalid_request' in response.data) + self.assertTrue('Missing redirect URI.' in response.data) + + def test_authorize_post_errors(self): + """ Call /oauth2/authorize in POST without any session + + Must return an unknown client identifier error + """ + self.login('demo', 'demo') + response = self.post_request('/oauth2/authorize') + self.assertEqual(response.status_code, 200) + self.assertTrue('Unknown Client Identifier!' in response.data) + self.assertTrue('This client identifier is invalid.' in response.data) + + @mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock) + def test_authorize_unsafe_chars(self, request_env): + """ Call /oauth2/authorize with unsafe chars in the query string """ + # Mock the http request's environ to allow it to see test records + request_env.return_value = self.env(user=self.user) + + query_string = 'client_id=%s&response_type=%s&state={}' % ( + self.client.identifier, + self.client.response_type, + ) + self.login('demo', 'demo') + response = self.test_client.get( + '/oauth2/authorize', query_string=query_string, + environ_base=self.werkzeug_environ) + self.assertEqual(response.status_code, 200) + self.assertTrue(self.client.name in response.data) + + +class TestOAuthProviderRefreshTokenController(object): + def test_refresh_token_error_too_much_scopes(self): + """ Call /oauth2/token using a refresh token, with too much scopes """ + token = self.new_token() + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids.mapped('code'), + 'grant_type': 'refresh_token', + 'refresh_token': token.refresh_token, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'invalid_scope'}) + + def test_refresh_token(self): + """ Get a new token using the refresh token """ + token = self.new_token() + token.scope_ids = self.client.scope_ids[0] + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': ' '.join(token.scope_ids.mapped('code')), + 'grant_type': 'refresh_token', + 'refresh_token': token.refresh_token, + }) + response_data = json.loads(response.data) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + new_token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id) + ], order='id DESC', limit=1) + self.assertEqual(response.status_code, 200) + self.assertEqual(new_token.token, response_data['access_token']) + self.assertEqual(new_token.token_type, response_data['token_type']) + self.assertEqual( + new_token.refresh_token, response_data['refresh_token']) + self.assertEqual(new_token.scope_ids, token.scope_ids) + self.assertEqual(new_token.user_id, self.user) + + +class TestOAuthProviderTokeninfoController(object): + def test_tokeninfo_error_missing_arguments(self): + """ Call /oauth2/tokeninfo without any argument + + Must retun an invalid_or_expired_token error + """ + response = self.get_request('/oauth2/tokeninfo') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_or_expired_token'}) + + def test_tokeninfo(self): + """ Retrieve token information """ + token = self.new_token() + token.scope_ids = self.client.scope_ids[0] + response = self.get_request('/oauth2/tokeninfo', data={ + 'access_token': token.token, + }) + token_lifetime = (fields.Datetime.from_string(token.expires_at) - + datetime.now()).seconds + response_data = json.loads(response.data) + self.assertEqual(response.status_code, 200) + self.assertEqual( + response_data['audience'], token.client_id.identifier) + self.assertEqual( + response_data['scopes'], ' '.join(token.scope_ids.mapped('code'))) + # Test a range because the test might not be accurate, depending on the + # test system load + self.assertTrue( + token_lifetime - 5 < response_data['expires_in'] < + token_lifetime + 5) + self.assertEqual( + response_data['user_id'], + hashlib.sha256(token.client_id.identifier + + token.user_id.oauth_identifier).hexdigest()) + + def test_tokeninfo_without_scopes(self): + """ Call /oauth2/tokeninfo without any scope + + Retrieve token information without any scopes on the token + The user_id field should not be included + """ + token = self.new_token() + token.scope_ids = self.env['oauth.provider.scope'] + response = self.get_request('/oauth2/tokeninfo', data={ + 'access_token': token.token, + }) + token_lifetime = (fields.Datetime.from_string(token.expires_at) - + datetime.now()).seconds + response_data = json.loads(response.data) + self.assertEqual(response.status_code, 200) + self.assertEqual( + response_data['audience'], token.client_id.identifier) + self.assertEqual( + response_data['scopes'], ' '.join(token.scope_ids.mapped('code'))) + # Test a range because the test might not be accurate, depending on the + # test system load + self.assertTrue( + token_lifetime - 5 < response_data['expires_in'] < + token_lifetime + 5) + + +class TestOAuthProviderUserinfoController(object): + def test_userinfo_error_missing_arguments(self): + """ Call /oauth2/userinfo without any argument + + Must return an invalid_or_expired_token error + """ + response = self.get_request('/oauth2/userinfo') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_or_expired_token'}) + + def test_userinfo_single_scope(self): + """ Retrieve user information with only a single scope """ + token = self.new_token() + token.scope_ids = self.client.scope_ids[0] + + # Retrieve user information + response = self.get_request('/oauth2/userinfo', data={ + 'access_token': token.token, + }) + self.assertEqual(response.status_code, 200) + self.assertEqual(json.loads(response.data), { + 'id': self.user.id, + 'email': self.user.email, + }) + + def test_userinfo_multiple_scope(self): + """ Retrieve user information with only a all test scopes """ + token = self.new_token() + token.scope_ids = self.client.scope_ids + + # Retrieve user information + response = self.get_request('/oauth2/userinfo', data={ + 'access_token': token.token, + }) + # The Email scope allows to read the email + # The Profile scope allows to read the name and city + # The id of the recod is always added (standard Odoo behaviour) + self.assertEqual(response.status_code, 200) + self.assertEqual(json.loads(response.data), { + 'id': self.user.id, + 'name': self.user.name, + 'email': self.user.email, + 'city': self.user.city, + }) + + +class TestOAuthProviderOtherinfoController(object): + def test_otherinfo_error_missing_arguments(self): + """ Call /oauth2/otherinfo method without any argument + + Must return an invalid_or_expired_token error + """ + response = self.get_request('/oauth2/otherinfo') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_or_expired_token'}) + + def test_otherinfo_error_missing_model(self): + """ Call /oauth2/otherinfo method without the model argument + + Must return an invalid_model error + """ + token = self.new_token() + response = self.get_request( + '/oauth2/otherinfo', data={'access_token': token.token}) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), {'error': 'invalid_model'}) + + def test_otherinfo_error_invalid_model(self): + """ Call /oauth2/otherinfo method withan invalid model + + Must return an invalid_model error + """ + token = self.new_token() + response = self.get_request( + '/oauth2/otherinfo', + data={'access_token': token.token, 'model': 'invalid.model'}) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), {'error': 'invalid_model'}) + + def test_otherinfo_user_information(self): + """ Call /oauth2/otherinfo to retrieve information using the token """ + token = self.new_token() + token.scope_ids = self.client.scope_ids + + # Add a new scope to test informations retrieval + token.scope_ids += self.env['oauth.provider.scope'].create({ + 'name': 'Groups', + 'code': 'groups', + 'description': 'List of accessible groups', + 'model_id': self.env.ref('base.model_res_groups').id, + 'filter_id': False, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_groups_name').id]), + ], + }) + # Retrieve user information + response = self.get_request('/oauth2/otherinfo', data={ + 'access_token': token.token, + 'model': 'res.users', + }) + # The Email scope allows to read the email + # The Profile scope allows to read the name and city + # The id of the recod is always added (standard Odoo behaviour) + self.assertEqual(response.status_code, 200) + self.assertEqual(json.loads(response.data), {str(self.user.id): { + 'id': self.user.id, + 'name': self.user.name, + 'email': self.user.email, + 'city': self.user.city, + }}) + + def test_otherinfo_group_information(self): + """ Call /oauth2/otherinfo to retrieve information using the token """ + token = self.new_token() + token.scope_ids = self.client.scope_ids + + # Add a new scope to test informations retrieval + token.scope_ids += self.env['oauth.provider.scope'].create({ + 'name': 'Groups', + 'code': 'groups', + 'description': 'List of accessible groups', + 'model_id': self.env.ref('base.model_res_groups').id, + 'filter_id': False, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_groups_name').id]), + ], + }) + + # Retrieve groups information + all_groups = self.env['res.groups'].search([]) + response = self.get_request('/oauth2/otherinfo', data={ + 'access_token': token.token, + 'model': 'res.groups', + }) + self.assertEqual(response.status_code, 200) + self.assertEqual( + sorted(json.loads(response.data).keys()), + sorted(map(str, all_groups.ids))) + + +class TestOAuthProviderRevokeTokenController(object): + def test_revoke_token_error_missing_arguments(self): + """ Call /oauth2/revoke_token method without any argument """ + response = self.post_request('/oauth2/revoke_token') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_or_expired_token'}) + + def test_revoke_token_error_missing_client_id(self): + """ Call /oauth2/revoke_token method without client identifier """ + token = self.new_token() + response = self.post_request('/oauth2/revoke_token', data={ + 'token': token.token, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client'}) + + def test_revoke_token_error_missing_token(self): + """ Call /oauth2/revoke_token method without token """ + response = self.post_request('/oauth2/revoke_token', data={ + 'client_id': self.client.identifier, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_or_expired_token'}) + + def test_revoke_access_token(self): + """ Revoke an access token """ + token = self.new_token() + self.post_request('/oauth2/revoke_token', data={ + 'client_id': self.client.identifier, + 'token': token.token, + }) + self.assertFalse(token.exists()) + + def test_revoke_refresh_token(self): + """ Revoke a refresh token """ + token = self.new_token() + self.post_request('/oauth2/revoke_token', data={ + 'client_id': self.client.identifier, + 'token': token.refresh_token, + }) + self.assertTrue(token.exists()) + self.assertFalse(token.refresh_token) diff --git a/oauth_provider/tests/test_oauth_provider_client.py b/oauth_provider/tests/test_oauth_provider_client.py new file mode 100644 index 000000000..2b842a03a --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_client.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import logging +from openerp.tests.common import TransactionCase +from ..oauth2.validator import OdooValidator + +_logger = logging.getLogger(__name__) + +try: + from oauthlib import oauth2 +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class TestOAuthProviderClient(TransactionCase): + + def setUp(self): + super(TestOAuthProviderClient, self).setUp() + self.client_vals = { + 'name': 'Client', + 'identifier': 'client', + } + + def new_client(self, vals=None): + values = self.client_vals.copy() + if vals is not None: + values.update(vals) + + return self.env['oauth.provider.client'].create(values) + + def test_grant_response_type_default(self): + """ Check the value of the grant_type and response_type fields """ + # Default : Web Application + client = self.new_client({'identifier': 'default'}) + self.assertEqual(client.grant_type, 'authorization_code') + self.assertEqual(client.response_type, 'code') + + def test_grant_response_type_web_application(self): + """ Check the value of the grant_type and response_type fields """ + # Web Application + client = self.new_client(vals={'application_type': 'web application'}) + self.assertEqual(client.grant_type, 'authorization_code') + self.assertEqual(client.response_type, 'code') + + def test_grant_response_type_mobile_application(self): + """ Check the value of the grant_type and response_type fields """ + # Mobile Application + client = self.new_client( + vals={'application_type': 'mobile application'}) + self.assertEqual(client.grant_type, 'implicit') + self.assertEqual(client.response_type, 'token') + + def test_grant_response_type_legacy_application(self): + """ Check the value of the grant_type and response_type fields """ + # Legacy Application + client = self.new_client( + vals={'application_type': 'legacy application'}) + self.assertEqual(client.grant_type, 'password') + self.assertEqual(client.response_type, 'none') + + def test_grant_response_type_backend_application(self): + """ Check the value of the grant_type and response_type fields """ + # Backend Application + client = self.new_client( + vals={'application_type': 'backend application'}) + self.assertEqual(client.grant_type, 'client_credentials') + self.assertEqual(client.response_type, 'none') + + def test_get_oauth2_server_default(self): + """ Check the returned server, depending on the application type """ + # Default : Web Application + client = self.new_client({'identifier': 'default'}) + self.assertTrue( + isinstance(client.get_oauth2_server(), + oauth2.WebApplicationServer)) + + def test_get_oauth2_server_web_application(self): + """ Check the returned server, depending on the application type """ + # Web Application + client = self.new_client(vals={'application_type': 'web application'}) + self.assertTrue( + isinstance(client.get_oauth2_server(), + oauth2.WebApplicationServer)) + + def test_get_oauth2_server_mobile_application(self): + """ Check the returned server, depending on the application type """ + # Mobile Application + client = self.new_client( + vals={'application_type': 'mobile application'}) + self.assertTrue( + isinstance(client.get_oauth2_server(), + oauth2.MobileApplicationServer)) + + def test_get_oauth2_server_legacy_applicaton(self): + """ Check the returned server, depending on the application type """ + # Legacy Application + client = self.new_client( + vals={'application_type': 'legacy application'}) + self.assertTrue( + isinstance(client.get_oauth2_server(), + oauth2.LegacyApplicationServer)) + + def test_get_oauth2_server_backend_application(self): + """ Check the returned server, depending on the application type """ + # Backend Application + client = self.new_client( + vals={'application_type': 'backend application'}) + self.assertTrue( + isinstance(client.get_oauth2_server(), + oauth2.BackendApplicationServer)) + + def test_get_oauth2_server_validator(self): + """ Check the validator of the returned server """ + client = self.new_client() + # No defined validator: Check that an OdooValidator instance is created + self.assertTrue( + isinstance(client.get_oauth2_server().request_validator, + OdooValidator)) + + def test_get_oauth2_server_validator_custom(self): + """ Check the validator of the returned server """ + client = self.new_client() + # Passed validator : Check that the validator instance is used + validator = OdooValidator() + self.assertEqual( + client.get_oauth2_server(validator).request_validator, validator) diff --git a/oauth_provider/tests/test_oauth_provider_controller_legacy_application.py b/oauth_provider/tests/test_oauth_provider_controller_legacy_application.py new file mode 100644 index 000000000..484ea5649 --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_controller_legacy_application.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import base64 +import json +import logging +from .common_test_controller import OAuthProviderControllerTransactionCase +from .common_test_oauth_provider_controller import \ + TestOAuthProviderRefreshTokenController, \ + TestOAuthProviderTokeninfoController, \ + TestOAuthProviderUserinfoController, \ + TestOAuthProviderOtherinfoController, \ + TestOAuthProviderRevokeTokenController + +_logger = logging.getLogger(__name__) + + +class TestOAuthProviderController( + OAuthProviderControllerTransactionCase, + TestOAuthProviderRefreshTokenController, + TestOAuthProviderTokeninfoController, + TestOAuthProviderUserinfoController, + TestOAuthProviderOtherinfoController, + TestOAuthProviderRevokeTokenController): + def setUp(self): + super(TestOAuthProviderController, self).setUp('legacy application') + + def test_token_error_missing_arguments(self): + """ Check /oauth2/token without any argument + + Must return an unsupported_grant_type error + """ + response = self.post_request('/oauth2/token') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_grant_type(self): + """ Check /oauth2/token with an invalid grant type + + Must return an unsupported_grant_type error + """ + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'grant_type': 'Wrong grant type', + 'username': 'Wrong username', + 'password': 'Wrong password', + }) + self.assertEqual(response.status_code, 400) + self.assertEqual( + json.loads(response.data), {'error': 'unsupported_grant_type'}) + + def test_token_error_missing_username(self): + """ Check /oauth2/token without username + + Must return an invalid_request error + """ + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'grant_type': self.client.grant_type, + }) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_request', + 'error_description': 'Request is missing username parameter.', + }) + + def test_token_error_missing_password(self): + """ Check /oauth2/token without password + + Must return an invalid_request error + """ + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'grant_type': self.client.grant_type, + 'username': 'Wrong username', + }) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_request', + 'error_description': 'Request is missing password parameter.', + }) + + def test_token_error_missing_client_id(self): + """ Check /oauth2/token without client + + Must return an unauthorized_client error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_client_identifier(self): + """ Check /oauth2/token with a wrong client identifier + + Must return an invalid_client_id error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': 'Wrong client identifier', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_username(self): + """ Check /oauth2/token with a wrong username + + Must return an invalid_grant error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': self.client.identifier, + 'username': 'Wrong username', + 'password': 'demo', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_grant', + 'error_description': 'Invalid credentials given.', + }) + + def test_token_error_wrong_password(self): + """ Check /oauth2/token with a wrong password + + Must return an invalid_grant error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': self.client.identifier, + 'username': self.user.login, + 'password': 'Wrong password', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_grant', + 'error_description': 'Invalid credentials given.', + }) + + def test_token_error_wrong_client_id(self): + """ Check /oauth2/token with a wrong client id + + Must return an invalid_client_id error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': 'Wrong client id', + 'scope': self.client.scope_ids[0].code, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_missing_refresh_token(self): + """ Check /oauth2/token in refresh token mode without refresh token + + Must return an invalid_request error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': 'refresh_token', + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + }) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_request', + 'error_description': 'Missing refresh token parameter.', + }) + + def test_token_error_invalid_refresh_token(self): + """ Check /oauth2/token in refresh token mode with an invalid refresh token + + Must return an invalid_grant error + """ + response = self.post_request('/oauth2/token', data={ + 'grant_type': 'refresh_token', + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'refresh_token': 'Wrong refresh token', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'}) + + def test_token_with_missing_secret(self): + """ Check client authentication without the secret provided + + Must return an invalid_client error + """ + # Define a secret for the client + self.client.secret = 'OAuth Client secret' + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client'}) + + def test_token_with_unexpected_secret(self): + """ Check client authentication with an unexpected secret provided + + Must return an invalid_client error + """ + # Don't define a secret for the client + auth_string = base64.b64encode( + '{client.identifier}:secret'.format(client=self.client)) + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }, headers=[( + 'Authorization', + 'Basic {auth_string}'.format(auth_string=auth_string)), + ]) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client'}) + + def test_token_with_wrong_secret(self): + """ Check client authentication with a wrong secret + + Must return an invalid_client error + """ + # Define a secret for the client + self.client.secret = 'OAuth Client secret' + auth_string = base64.b64encode( + '{client.identifier}:secret'.format(client=self.client)) + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }, headers=[( + 'Authorization', + 'Basic {auth_string}'.format(auth_string=auth_string)), + ]) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client'}) + + def test_token_with_secret(self): + """ Check client authentication from Authorization header """ + # Define a secret for the client + self.client.secret = 'OAuth Client secret' + auth_string = base64.b64encode( + '{client.identifier}:{client.secret}'.format(client=self.client)) + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }, headers=[( + 'Authorization', + 'Basic {auth_string}'.format(auth_string=auth_string)), + ]) + response_data = json.loads(response.data) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + self.assertEqual(response.status_code, 200) + self.assertEqual(token.token, response_data['access_token']) + self.assertEqual(token.token_type, response_data['token_type']) + self.assertEqual(token.refresh_token, response_data['refresh_token']) + self.assertEqual(token.scope_ids, self.client.scope_ids[0]) + self.assertEqual(token.user_id, self.user) + + def test_token_with_wrong_non_basic_authorization(self): + """ Check client authentication with a non-Basic Authorization header + + Must generate a token : Non basic authorization headers are ignored + """ + # Don't define a secret for the client + auth_string = base64.b64encode( + '{client.identifier}:secret'.format(client=self.client)) + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }, headers=[( + 'Authorization', + 'Digest {auth_string}'.format(auth_string=auth_string)), + ]) + response_data = json.loads(response.data) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + self.assertEqual(response.status_code, 200) + self.assertEqual(token.token, response_data['access_token']) + self.assertEqual(token.token_type, response_data['token_type']) + self.assertEqual(token.refresh_token, response_data['refresh_token']) + self.assertEqual(token.scope_ids, self.client.scope_ids[0]) + self.assertEqual(token.user_id, self.user) + + def test_token_with_right_non_basic_authorization(self): + """ Check client authentication with a non-Basic Authorization header + + Must return an invalid_client error : Non basic authorization headers + are ignored + """ + # Define a secret for the client + self.client.secret = 'OAuth Client secret' + auth_string = base64.b64encode( + '{client.identifier}:{client.secret}'.format(client=self.client)) + + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }, headers=[( + 'Authorization', + 'Digest {auth_string}'.format(auth_string=auth_string)), + ]) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client'}) + + def test_successful_token_retrieval(self): + """ Check the full process for a LegacyApplication + + GET, then POST, token and informations retrieval + """ + # Ask a token to the server + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'scope': self.client.scope_ids[0].code, + 'grant_type': self.client.grant_type, + 'username': self.user.login, + 'password': 'demo', + }) + response_data = json.loads(response.data) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + self.assertEqual(response.status_code, 200) + self.assertEqual(token.token, response_data['access_token']) + self.assertEqual(token.token_type, response_data['token_type']) + self.assertEqual(token.refresh_token, response_data['refresh_token']) + self.assertEqual(token.scope_ids, self.client.scope_ids[0]) + self.assertEqual(token.user_id, self.user) diff --git a/oauth_provider/tests/test_oauth_provider_controller_mobile_application.py b/oauth_provider/tests/test_oauth_provider_controller_mobile_application.py new file mode 100644 index 000000000..d4e6e5b4b --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_controller_mobile_application.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import logging +from .common_test_controller import OAuthProviderControllerTransactionCase +from .common_test_oauth_provider_controller import \ + TestOAuthProviderAurhorizeController, \ + TestOAuthProviderTokeninfoController, \ + TestOAuthProviderUserinfoController, \ + TestOAuthProviderOtherinfoController, \ + TestOAuthProviderRevokeTokenController + +_logger = logging.getLogger(__name__) + +try: + import oauthlib +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class TestOAuthProviderController( + OAuthProviderControllerTransactionCase, + TestOAuthProviderAurhorizeController, + TestOAuthProviderTokeninfoController, + TestOAuthProviderUserinfoController, + TestOAuthProviderOtherinfoController, + TestOAuthProviderRevokeTokenController): + def setUp(self): + super(TestOAuthProviderController, self).setUp('mobile application') + + def test_authorize_skip_authorization(self): + """ Call /oauth2/authorize while skipping the authorization page """ + # Configure the client to skip the authorization page + self.client.skip_authorization = True + + # Login as demo user + self.login(self.user.login, self.user.login) + + # Call the authorize method with good values + state = 'Some custom state' + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'state': state, + }) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + # The response should be a redirect to the redirect URI, with the + # authorization_code added as GET parameter + self.assertEqual(response.status_code, 302) + query_string = oauthlib.common.urlencode({ + 'state': state, + 'access_token': token.token, + 'token_type': token.token_type, + 'expires_in': 3600, + 'scope': token.scope_ids.code, + }.items()) + self.assertEqual( + response.headers['Location'], '{uri_base}#{query_string}'.format( + uri_base=self.redirect_uri_base, query_string=query_string)) + self.assertEqual(token.user_id, self.user) + + def test_successful_token_retrieval(self): + """ Check the full process for a MobileApplication + + GET, then POST, token and informations retrieval + """ + # Call the authorize method with good values to fill the session scopes + # and credentials variables + state = 'Some custom state' + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'state': state, + }) + self.assertEqual(response.status_code, 200) + self.assertTrue(self.client.name in response.data) + self.assertTrue(self.client.scope_ids[0].name in response.data) + self.assertTrue(self.client.scope_ids[0].description in response.data) + + # Then, call the POST route to validate the authorization + response = self.post_request('/oauth2/authorize') + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + # The response should be a redirect to the redirect URI, with the + # token added as GET parameter + self.assertEqual(response.status_code, 302) + query_string = oauthlib.common.urlencode({ + 'state': state, + 'access_token': token.token, + 'token_type': token.token_type, + 'expires_in': 3600, + 'scope': token.scope_ids.code, + }.items()) + self.assertEqual( + response.headers['Location'], '{uri_base}#{query_string}'.format( + uri_base=self.redirect_uri_base, query_string=query_string)) + self.assertEqual(token.user_id, self.user) diff --git a/oauth_provider/tests/test_oauth_provider_controller_web_application.py b/oauth_provider/tests/test_oauth_provider_controller_web_application.py new file mode 100644 index 000000000..869c6e5cc --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_controller_web_application.py @@ -0,0 +1,358 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +import json +import logging +from .common_test_controller import OAuthProviderControllerTransactionCase +from .common_test_oauth_provider_controller import \ + TestOAuthProviderRefreshTokenController, \ + TestOAuthProviderAurhorizeController, \ + TestOAuthProviderTokeninfoController, \ + TestOAuthProviderUserinfoController, \ + TestOAuthProviderOtherinfoController, \ + TestOAuthProviderRevokeTokenController + +_logger = logging.getLogger(__name__) + +try: + import oauthlib +except ImportError: + _logger.debug('Cannot `import oauthlib`.') + + +class TestOAuthProviderController( + OAuthProviderControllerTransactionCase, + TestOAuthProviderRefreshTokenController, + TestOAuthProviderAurhorizeController, + TestOAuthProviderTokeninfoController, + TestOAuthProviderUserinfoController, + TestOAuthProviderOtherinfoController, + TestOAuthProviderRevokeTokenController): + def setUp(self): + super(TestOAuthProviderController, self).setUp('web application') + + def new_code(self): + # Configure the client to skip the authorization page + self.client.skip_authorization = True + + # Call the authorize method with good values + state = 'Some custom state' + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'state': state, + }) + # A new authorization code should have been generated + # We can safely pick the latest generated code here, because no other + # code could have been generated during the test + code = self.env['oauth.provider.authorization.code'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + # The response should be a redirect to the redirect URI, with the + # authorization_code added as GET parameter + self.assertEqual(response.status_code, 302) + query_string = oauthlib.common.urlencode( + {'state': state, 'code': code.code}.items()) + self.assertEqual( + response.headers['Location'], '{uri_base}?{query_string}'.format( + uri_base=self.redirect_uri_base, query_string=query_string)) + self.assertEqual(code.user_id, self.user) + + self.logout() + + return code + + def test_token_error_missing_session(self): + """ Check /oauth2/token without any session set + + Must return an invalid_client_id error + """ + response = self.post_request('/oauth2/token') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_missing_arguments(self): + """ Check /oauth2/token without any argument + + Must return an invalid_client_id error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token') + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_grant_type(self): + """ Check /oauth2/token with an invalid grant type + + Must return an invalid_client_id error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': 'Wrong grant type', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_missing_code(self): + """ Check /oauth2/token without code + + Must return an invalid_client_id error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_missing_client_id(self): + """ Check /oauth2/token without client + + Must return an invalid_client_id error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'code': 'Wrong code', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_client_identifier(self): + """ Check /oauth2/token with a wrong client identifier + + Must return an invalid_client_id error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': 'Wrong client identifier', + 'code': 'Wrong code', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_wrong_code(self): + """ Check /oauth2/token with a wrong code + + Must return an invalid_grant error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': self.client.identifier, + 'code': 'Wrong code', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'}) + + def test_token_error_missing_redirect_uri(self): + """ Check /oauth2/token without redirect_uri + + Must return an access_denied error + """ + # Generate an authorization code + code = self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': self.client.identifier, + 'code': code.code, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'access_denied'}) + + def test_token_error_wrong_redirect_uri(self): + """ Check /oauth2/token with a wrong redirect_uri + + Must return an access_denied error + """ + # Generate an authorization code + code = self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': self.client.identifier, + 'code': code.code, + 'redirect_uri': 'Wrong redirect URI', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'access_denied'}) + + def test_token_error_wrong_client_id(self): + """ Check /oauth2/token with a wrong client id + + Must return an invalid_client_id error + """ + # Generate an authorization code + code = self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': self.client.grant_type, + 'client_id': 'Wrong client id', + 'code': code.code, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + }) + self.assertEqual(response.status_code, 401) + self.assertEqual( + json.loads(response.data), {'error': 'invalid_client_id'}) + + def test_token_error_missing_refresh_token(self): + """ Check /oauth2/token in refresh token mode without refresh token + + Must return an invalid_request error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': 'refresh_token', + 'client_id': self.client.identifier, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + }) + self.assertEqual(response.status_code, 400) + self.assertEqual(json.loads(response.data), { + 'error': 'invalid_request', + 'error_description': 'Missing refresh token parameter.', + }) + + def test_token_error_invalid_refresh_token(self): + """ Check /oauth2/token in refresh token mode with an invalid refresh token + + Must return an invalid_grant error + """ + # Generate an authorization code to set the session + self.new_code() + + response = self.post_request('/oauth2/token', data={ + 'grant_type': 'refresh_token', + 'client_id': self.client.identifier, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'refresh_token': 'Wrong refresh token', + }) + self.assertEqual(response.status_code, 401) + self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'}) + + def test_authorize_skip_authorization(self): + """ Call /oauth2/authorize while skipping the authorization page """ + # Configure the client to skip the authorization page + self.client.skip_authorization = True + + # Call the authorize method with good values + state = 'Some custom state' + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'state': state, + }) + # A new authorization code should have been generated + # We can safely pick the latest generated code here, because no other + # code could have been generated during the test + code = self.env['oauth.provider.authorization.code'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + # The response should be a redirect to the redirect URI, with the + # authorization_code added as GET parameter + self.assertEqual(response.status_code, 302) + query_string = oauthlib.common.urlencode({ + 'state': state, + 'code': code.code, + }.items()) + self.assertEqual( + response.headers['Location'], '{uri_base}?{query_string}'.format( + uri_base=self.redirect_uri_base, query_string=query_string)) + self.assertEqual(code.user_id, self.user) + + def test_successful_token_retrieval(self): + """ Check the full process for a WebApplication + + GET, then POST, token and informations retrieval + """ + # Call the authorize method with good values to fill the session scopes + # and credentials variables + state = 'Some custom state' + self.login('demo', 'demo') + response = self.get_request('/oauth2/authorize', data={ + 'client_id': self.client.identifier, + 'response_type': self.client.response_type, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'state': state, + }) + self.assertEqual(response.status_code, 200) + self.assertTrue(self.client.name in response.data) + self.assertTrue(self.client.scope_ids[0].name in response.data) + self.assertTrue(self.client.scope_ids[0].description in response.data) + + # Then, call the POST route to validate the authorization + response = self.post_request('/oauth2/authorize') + # A new authorization code should have been generated + # We can safely pick the latest generated code here, because no other + # code could have been generated during the test + code = self.env['oauth.provider.authorization.code'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + # The response should be a redirect to the redirect URI, with the + # authorization_code added as GET parameter + self.assertEqual(response.status_code, 302) + query_string = oauthlib.common.urlencode({ + 'state': state, + 'code': code.code, + }.items()) + self.assertEqual( + response.headers['Location'], '{uri_base}?{query_string}'.format( + uri_base=self.redirect_uri_base, query_string=query_string)) + self.assertEqual(code.user_id, self.user) + + self.logout() + + # Now that the user vaidated the authorization, we can ask for a token, + # using the returned code + response = self.post_request('/oauth2/token', data={ + 'client_id': self.client.identifier, + 'redirect_uri': self.redirect_uri_base, + 'scope': self.client.scope_ids[0].code, + 'code': code.code, + 'grant_type': self.client.grant_type, + }) + response_data = json.loads(response.data) + # A new token should have been generated + # We can safely pick the latest generated token here, because no other + # token could have been generated during the test + token = self.env['oauth.provider.token'].search([ + ('client_id', '=', self.client.id), + ], order='id DESC', limit=1) + self.assertEqual(response.status_code, 200) + self.assertEqual(token.token, response_data['access_token']) + self.assertEqual(token.token_type, response_data['token_type']) + self.assertEqual(token.refresh_token, response_data['refresh_token']) + self.assertEqual(token.scope_ids, code.scope_ids) + self.assertEqual(token.user_id, self.user) diff --git a/oauth_provider/tests/test_oauth_provider_scope.py b/oauth_provider/tests/test_oauth_provider_scope.py new file mode 100644 index 000000000..8732f1ac4 --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_scope.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from openerp.tests.common import TransactionCase + + +class TestOAuthProviderScope(TransactionCase): + + def setUp(self): + super(TestOAuthProviderScope, self).setUp() + self.filter = self.env['ir.filters'].create({ + 'name': 'Current user', + 'model_id': 'res.users', + 'domain': "[('id', '=', uid)]", + }) + self.scope_vals = { + 'name': 'Scope', + 'code': 'scope', + 'description': 'Description of the scope', + 'model_id': self.env.ref('base.model_res_users').id, + 'filter_id': self.filter.id, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_users_email').id]), + ], + } + + def new_scope(self, vals=None): + values = self.scope_vals + if vals is not None: + values.update(vals) + + return self.env['oauth.provider.scope'].create(values) + + def test_get_data_from_model_without_filter(self): + """ Check the values returned by the get_data_for_model method when no + filter is defined + """ + scope = self.new_scope({'filter_id': False}) + + # Check a simple call with the right model + data = scope.get_data_for_model('res.users') + # Check that we have multiple users (otherwise this test is useless) + self.assertTrue(len(self.env['res.users'].search([]).ids) > 1) + self.assertEqual( + set(data.keys()), set(self.env['res.users'].search([]).ids)) + + def test_get_data_from_model_without_filter_wrong_model(self): + """ Check the values returned by the get_data_for_model method when no + filter is defined + """ + scope = self.new_scope({'filter_id': False}) + + # Check a simple call with a wrong model + data = scope.get_data_for_model('res.partner') + self.assertEqual(data, {}) + + def test_get_data_from_model_with_filter(self): + """ Check the values returned by the get_data_for_model method when no + res_id is supplied + """ + scope = self.new_scope() + + # Check a simple call with the right model + data = scope.get_data_for_model('res.users') + self.assertEqual(data, { + self.env.user.id: { + 'id': self.env.user.id, + 'email': self.env.user.email, + }, + }) + + def test_get_data_from_model_with_filter_wrong_model(self): + """ Check the values returned by the get_data_for_model method when no + res_id is supplied + """ + scope = self.new_scope() + + # Check a simple call with a wrong model + data = scope.get_data_for_model('res.partner') + self.assertEqual(data, {}) + + def test_get_data_from_model_with_res_id_and_no_filter(self): + """ Check the values returned by the get_data_for_model method when a + res_id is supplied + """ + scope = self.new_scope({'filter_id': False}) + + # Check a simple call with the right model + data = scope.get_data_for_model('res.users', res_id=self.env.user.id) + self.assertEqual(data, { + 'id': self.env.user.id, + 'email': self.env.user.email, + }) + + def test_get_data_from_model_with_res_id_and_no_filter_wrong_model(self): + """ Check the values returned by the get_data_for_model method when a + res_id is supplied + """ + scope = self.new_scope({'filter_id': False}) + + # Check a simple call with a wrong model + data = scope.get_data_for_model( + 'res.partner', res_id=self.env.user.id + 1) + self.assertEqual(data, {}) + + def test_get_data_from_model_with_res_id(self): + """ Check the values returned by the get_data_for_model method when a + res_id is supplied + """ + scope = self.new_scope() + + # Check a simple call with the right model + data = scope.get_data_for_model('res.users', res_id=self.env.user.id) + self.assertEqual(data, { + 'id': self.env.user.id, + 'email': self.env.user.email, + }) + + def test_get_data_from_model_with_res_id_wrong_model(self): + """ Check the values returned by the get_data_for_model method when a + res_id is supplied + """ + scope = self.new_scope() + + # Check a simple call with a wrong model + data = scope.get_data_for_model( + 'res.partner', res_id=self.env.user.id + 1) + self.assertEqual(data, {}) + + def _generate_multiple_scopes(self): + scopes = self.new_scope() + scopes += self.new_scope({ + 'code': 'Profile', + 'field_ids': [(6, 0, [ + self.env.ref('base.field_res_users_name').id, + self.env.ref('base.field_res_users_city').id, + self.env.ref('base.field_res_users_country_id').id, + ])], + }) + scopes += self.new_scope({ + 'model_id': self.env.ref('base.model_res_groups').id, + 'code': 'All groups', + 'filter_id': False, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_groups_name').id]), + ], + }) + + return scopes + + def test_get_data_from_model_with_multiple_scopes_empty_fields(self): + """ Check the values returned by the get_data_for_model method when + calling on multiple scopes + """ + scopes = self._generate_multiple_scopes() + + # Check a simple call with the right model with empty fields + self.env.user.city = False + self.env.user.country_id = False + data = scopes.get_data_for_model('res.users') + self.assertEqual(data, {self.env.user.id: { + 'id': 1, + 'email': self.env.user.email, + 'name': self.env.user.name, + 'city': False, + 'country_id': False, + }}) + + def test_get_data_from_model_with_multiple_scopesfirst_model(self): + """ Check the values returned by the get_data_for_model method when + calling on multiple scopes + """ + scopes = self._generate_multiple_scopes() + + # Check a simple call with the right model without empty fields + country = self.env.ref('base.fr') + self.env.user.city = 'Paris' + self.env.user.country_id = country + data = scopes.get_data_for_model('res.users') + self.assertEqual(data, {self.env.user.id: { + 'id': 1, + 'email': self.env.user.email, + 'name': self.env.user.name, + 'city': self.env.user.city, + 'country_id': country.name, + }}) + + def test_get_data_from_model_with_multiple_scopes_second_model(self): + """ Check the values returned by the get_data_for_model method when + calling on multiple scopes + """ + scopes = self._generate_multiple_scopes() + + # Check a simple call with another right model + data = scopes.get_data_for_model('res.groups') + self.assertEqual( + set(data.keys()), set(self.env['res.groups'].search([]).ids)) + + def test_get_data_from_model_with_multiple_scopes_wrong_model(self): + """ Check the values returned by the get_data_for_model method when + calling on multiple scopes + """ + scopes = self._generate_multiple_scopes() + + # Check a simple call with a wrong model + data = scopes.get_data_for_model('res.partner') + self.assertEqual(data, {}) + + def _generate_multiple_scopes_match(self): + scopes = self.new_scope() + scopes += self.new_scope({ + 'code': 'All users', + 'filter_id': False, + }) + scopes += self.new_scope({ + 'model_id': self.env.ref('base.model_res_groups').id, + 'code': 'All groups', + 'filter_id': False, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_groups_name').id]), + ], + }) + + return scopes + + def test_get_data_from_model_with_all_scopes_match(self): + """ Check the values returned by the get_data_for_model method when all + scopes are required to match + """ + scopes = self._generate_multiple_scopes_match() + + # Check a simple call with the right model with any scope match + # returned records + data = scopes.get_data_for_model('res.users') + self.assertEqual( + set(data.keys()), set(self.env['res.users'].search([]).ids)) + + def test_get_data_from_model_with_all_scopes_match_first_model(self): + """ Check the values returned by the get_data_for_model method when all + scopes are required to match + """ + scopes = self._generate_multiple_scopes_match() + + # Check a simple call with the right model with all scopes required to + # match returned records + data = scopes.get_data_for_model('res.users', all_scopes_match=True) + self.assertEqual(data, {self.env.user.id: { + 'id': 1, + 'email': self.env.user.email, + }}) + + def test_get_data_from_model_with_all_scopes_match_second_model(self): + """ Check the values returned by the get_data_for_model method when all + scopes are required to match + """ + scopes = self._generate_multiple_scopes_match() + + # Check a simple call with another right model + data = scopes.get_data_for_model('res.groups') + self.assertEqual( + set(data.keys()), set(self.env['res.groups'].search([]).ids)) + + def test_get_data_from_model_with_all_scopes_match_wrong_model(self): + """ Check the values returned by the get_data_for_model method when all + scopes are required to match + """ + scopes = self._generate_multiple_scopes_match() + + # Check a simple call with a wrong model + data = scopes.get_data_for_model('res.partner') + self.assertEqual(data, {}) diff --git a/oauth_provider/tests/test_oauth_provider_token.py b/oauth_provider/tests/test_oauth_provider_token.py new file mode 100644 index 000000000..446b19c1f --- /dev/null +++ b/oauth_provider/tests/test_oauth_provider_token.py @@ -0,0 +1,270 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 SYLEAM +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from datetime import datetime, timedelta +from openerp import fields, exceptions +from openerp.tests.common import TransactionCase + + +class TestOAuthProviderToken(TransactionCase): + + def setUp(self): + super(TestOAuthProviderToken, self).setUp() + self.client = self.env['oauth.provider.client'].create({ + 'name': 'Client', + 'identifier': 'client', + }) + self.token_vals = { + 'user_id': self.env.user.id, + 'client_id': self.client.id, + 'token': 'token', + 'expires_at': fields.Datetime.now(), + } + self.filter = self.env['ir.filters'].create({ + 'name': 'Current user', + 'model_id': 'res.users', + 'domain': "[('id', '=', uid)]", + }) + self.scope_vals = { + 'name': 'Scope', + 'code': 'scope', + 'description': 'Description of the scope', + 'model_id': self.env.ref('base.model_res_users').id, + 'filter_id': self.filter.id, + 'field_ids': [ + (6, 0, [self.env.ref('base.field_res_users_email').id]), + ], + } + + def new_token(self, vals=None): + values = self.token_vals + if vals is not None: + values.update(vals) + + return self.env['oauth.provider.token'].create(values) + + def new_scope(self, vals=None): + values = self.scope_vals + if vals is not None: + values.update(vals) + + return self.env['oauth.provider.scope'].create(values) + + def test_inactive(self): + """ Check the value of the active field, for an expired token """ + not_expired = datetime.now() + timedelta(days=1) + token = self.new_token(vals={ + 'token': 'Active', + 'expires_at': fields.Datetime.to_string(not_expired), + }) + self.assertEqual(token.active, True) + + def test_active(self): + """ Check the value of the active field, for a not expired token """ + expired = datetime.now() - timedelta(minutes=1) + token = self.new_token(vals={ + 'token': 'Not active', + 'expires_at': fields.Datetime.to_string(expired), + }) + self.assertEqual(token.active, False) + + def _generate_tokens_for_active_search(self): + not_expired = datetime.now() + timedelta(days=1) + not_expired_tokens = self.new_token(vals={ + 'token': 'Not expired', + 'expires_at': fields.Datetime.to_string(not_expired), + }) + not_expired_tokens += not_expired_tokens.copy(default={ + 'token': 'Other not expired', + }) + + expired = datetime.now() - timedelta(minutes=1) + expired_tokens = self.new_token(vals={ + 'token': 'Expired', + 'expires_at': fields.Datetime.to_string(expired), + }) + expired_tokens += expired_tokens.copy(default={ + 'token': 'Other expired', + }) + + return expired_tokens, not_expired_tokens + + def test_search_empty_domain(self): + """ Check the results of searching tokens with an empty domain + + Only active tokens should be returned + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([]), not_expired_tokens) + + def test_active_search_equal_true(self): + """ Check the results of searching tokens with explicit active = True domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', '=', True), + ]), not_expired_tokens) + + def test_active_search_equal_false(self): + """ Check the results of searching tokens with active = False domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', '=', False), + ]), expired_tokens) + + def test_active_search_not_equal_true(self): + """ Check the results of searching tokens with active != True domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', '!=', True), + ]), expired_tokens) + + def test_active_search_not_equal_false(self): + """ Check the results of searching tokens with active != False domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', '!=', False), + ]), not_expired_tokens) + + def test_active_search_in_true(self): + """ Check the results of searching tokens with active in (True,) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'in', (True,)), + ]), not_expired_tokens) + + def test_active_search_in_false(self): + """ Check the results of searching tokens with active in (False,) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'in', (False,)), + ]), expired_tokens) + + def test_active_search_not_in_true(self): + """ Check the results of searching tokens with active not in (True,) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'not in', (True,)), + ]), expired_tokens) + + def test_active_search_not_in_false(self): + """ Check the results of searching tokens with active not in (False,) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'not in', (False,)), + ]), not_expired_tokens) + + def test_active_search_in_true_false(self): + """ Check the results of searching tokens with active in (True, False) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'in', (True, False)), + ]), not_expired_tokens + expired_tokens) + + def test_active_search_not_in_true_false(self): + """ Check the results of searching tokens with active notin (True,False) domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + self.assertEqual(token_obj.search([ + ('active', 'not in', (True, False)), + ]), token_obj) + + def test_active_search_invalid_operator(self): + """ Check the results of searching tokens with an invalid operatr in domain + """ + token_obj = self.env['oauth.provider.token'] + expired_tokens, not_expired_tokens = \ + self._generate_tokens_for_active_search() + + with self.assertRaises(exceptions.UserError): + token_obj.search([('active', '>', True)]) + + def test_get_data_from_model_with_at_least_one_scope_matching(self): + """ Check the values returned by the get_data_for_model method with + at least one scope matching the data + """ + scopes = self.new_scope() + scopes += self.new_scope({ + 'code': 'All users', + 'filter_id': False, + }) + token = self.new_token(vals={ + 'scope_ids': [(6, 0, scopes.ids)], + }) + + # Check a simple call with the right model with empty fields + data = token.get_data_for_model('res.users') + self.assertEqual( + sorted(data.keys()), sorted(self.env['res.users'].search([]).ids)) + + def test_get_data_from_model_with_all_scopes_matching(self): + """ Check the values returned by the get_data_for_model method with + all scopes required to match the data + """ + scopes = self.new_scope() + scopes += self.new_scope({ + 'code': 'All users', + 'filter_id': False, + }) + token = self.new_token(vals={ + 'scope_ids': [(6, 0, scopes.ids)], + }) + + # Check a simple call with the right model without empty fields + data = token.get_data_for_model('res.users', all_scopes_match=True) + self.assertEqual(data, {self.env.user.id: { + 'id': 1, + 'email': self.env.user.email, + }}) + + def test_get_data_from_model_with_no_scope_matching(self): + """ Check the values returned by the get_data_for_model method with + an unauthorized model + """ + token = self.new_token() + + # Check a simple call with a wrong model + data = token.get_data_for_model('res.partner') + self.assertEqual(data, {}) diff --git a/oauth_provider/views/oauth_provider_client.xml b/oauth_provider/views/oauth_provider_client.xml new file mode 100644 index 000000000..6f4c99c13 --- /dev/null +++ b/oauth_provider/views/oauth_provider_client.xml @@ -0,0 +1,89 @@ + + + + + oauth.provider.client.tree + oauth.provider.client + + + + + + + + + + + oauth.provider.client.form + oauth.provider.client + +
+ +

+ +

+ + + + + + + + + + + + + + + + + + + + + + +
+
+
+
+ + oauth.provider.client.search + oauth.provider.client + + + + + + + + + + + OAuth Provider Clients + ir.actions.act_window + oauth.provider.client + form + tree,form + + [] + {} + + + + + form + + + + + + tree + + + +
diff --git a/oauth_provider/views/oauth_provider_scope.xml b/oauth_provider/views/oauth_provider_scope.xml new file mode 100644 index 000000000..06ce33288 --- /dev/null +++ b/oauth_provider/views/oauth_provider_scope.xml @@ -0,0 +1,87 @@ + + + + + oauth.provider.scope.tree + oauth.provider.scope + + + + + + + + + + + oauth.provider.scope.form + oauth.provider.scope + +
+ +

+ +

+ + + + + + + + + + + + + + + + + + + + +
+
+
+
+ + oauth.provider.scope.search + oauth.provider.scope + + + + + + + + + + + OAuth Provider Scopes + ir.actions.act_window + oauth.provider.scope + form + tree,form + + [] + {} + + + + + form + + + + + + tree + + + +
diff --git a/requirements.txt b/requirements.txt index 13d860cc2..ba6006441 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ validate_email pysftp pyotp fs==0.5.4 +oauthlib