Browse Source

Merge pull request #611 from syleam/9.0-add-oauth_provider-module

[9.0] Add oauth_provider module
pull/443/merge
Dave Lasley 8 years ago
committed by GitHub
parent
commit
0dbdf73a64
  1. 142
      oauth_provider/README.rst
  2. 24
      oauth_provider/__init__.py
  3. 28
      oauth_provider/__openerp__.py
  4. 5
      oauth_provider/controllers/__init__.py
  5. 290
      oauth_provider/controllers/main.py
  6. 10
      oauth_provider/models/__init__.py
  7. 33
      oauth_provider/models/oauth_provider_authorization_code.py
  8. 130
      oauth_provider/models/oauth_provider_client.py
  9. 17
      oauth_provider/models/oauth_provider_redirect_uri.py
  10. 102
      oauth_provider/models/oauth_provider_scope.py
  11. 99
      oauth_provider/models/oauth_provider_token.py
  12. 20
      oauth_provider/models/res_users.py
  13. 0
      oauth_provider/oauth2/__init__.py
  14. 256
      oauth_provider/oauth2/validator.py
  15. 9
      oauth_provider/security/ir.model.access.csv
  16. 32
      oauth_provider/security/oauth_provider_security.xml
  17. 39
      oauth_provider/templates/authorization.xml
  18. 10
      oauth_provider/tests/__init__.py
  19. 121
      oauth_provider/tests/common_test_controller.py
  20. 419
      oauth_provider/tests/common_test_oauth_provider_controller.py
  21. 128
      oauth_provider/tests/test_oauth_provider_client.py
  22. 374
      oauth_provider/tests/test_oauth_provider_controller_legacy_application.py
  23. 113
      oauth_provider/tests/test_oauth_provider_controller_mobile_application.py
  24. 358
      oauth_provider/tests/test_oauth_provider_controller_web_application.py
  25. 272
      oauth_provider/tests/test_oauth_provider_scope.py
  26. 270
      oauth_provider/tests/test_oauth_provider_token.py
  27. 89
      oauth_provider/views/oauth_provider_client.xml
  28. 87
      oauth_provider/views/oauth_provider_scope.xml
  29. 1
      requirements.txt

142
oauth_provider/README.rst

@ -0,0 +1,142 @@
.. image:: https://img.shields.io/badge/licence-AGPL--3-blue.svg
:target: http://www.gnu.org/licenses/agpl-3.0-standalone.html
:alt: License: AGPL-3
==============
OAuth Provider
==============
This module allows you to turn Odoo into an OAuth 2 provider.
It's meant to provide the basic authentication feature, and some data access routes.
But you are encouraged to create custom routes, in other modules, to give structured data for any specific need.
Installation
============
To install this module, you need to:
#. Install the oauthlib python module
#. Install the module like any other in Odoo
#. For the token retrieval to work on a multi-database instance, you should add this module in the server_wide_modules list
Configuration
=============
This module requires you to configure two things :
#. The scopes are used to define restricted data access
#. The clients are used to declare applications that will be allowed to request tokens and data
To configure scopes, you need to:
#. Go to Settings > Users > OAuth Provider Scopes
#. Create some scopes:
- The scope name and description will be displayed to the user on the authorization page.
- The code is the value provided by the OAuth clients to request access to the scope.
- The model defines which model the scope is linked to (access to user data, partners, sales orders, etc.).
- The filter allows you to determine which records will be accessible through this scope. No filter means all records of the model are accessible.
- The field names allows you to define which fields will be provided to the clients. An empty list only returns the id of accessible records.
To configure clients, you need to:
#. Go to Settings > Users > OAuth Provider Clients
#. Create at least one client:
- The name will be displayed to the user on the authorization page.
- The client identifier is the value provided by the OAuth clients to request authorizations/tokens.
- The application type adapts the process to four pre-defined profiles:
- Web Application : Authorization Code Grant
- Mobile Application : Implicit Grant
- Legacy Application : Resource Owner Password Credentials Grant
- Backend Application : User Credentials Grant (not implemented yet)
- The skip authorization checkbox allows the client to skip the authorization page, and directly deliver a token without prompting the user (useful when the application is trusted).
- The allowed scopes list defines which data will be accessible by this client applicaton.
- The allowed redirect URIs must match the URI sent by the client, to avoid redirecting users to an unauthorized service. The first value in the list is the default redirect URI.
For example, to configure an Odoo's *auth_oauth* module compatible client, you will enter these values :
- Name : Anything you want
- Client identifier : The identifier you want to give to this client
- Application Type : Mobile Application (Odoo uses the implicit grant mode, which corresponds to the mobile application profile)
- Allowed Scopes : Nothing required, but allowing access to current user's email and name is used by Odoo to fill user's information on signup
- Allowed Redirect URIs : http://odoo.example.com/auth_oauth/signin
Usage
=====
This module will allow OAuth clients to use your Odoo instance as an OAuth provider.
Once configured, you must give these information to your client application :
#. Client identifier : Identifies the application (to be able to check allowed scopes and redirect URIs)
#. Allowed scopes : The codes of scopes allowed for this client
#. URLs for the requests :
- Authorization request : http://odoo.example.com/oauth2/authorize
- Token request : http://odoo.example.com/oauth2/token
- Token information request : http://odoo.example.com/oauth2/tokeninfo
Parameters : access_token
- User information request : http://odoo.example.com/oauth2/userinfo
Parameters : access_token
- Any other model information request (depending on the scopes) : http://odoo.example.com/oauth2/otherinfo
Parameters : access_token and model
For example, to configure the *auth_oauth* Odoo module as a client, you will enter these values :
- Provider name : Anything you want
- Client ID : The identifier of the client configured in your Odoo Provider instance
- Body : Text displayed on Odoo's login page link
- Authentication URL : http://odoo.example.com/oauth2/authorize
- Scope : A space separated list of scope codes allowed to the client in your Odoo Provider instance
- Validation URL : http://odoo.example.com/oauth2/tokeninfo
- Data URL : http://odoo.example.com/oauth2/userinfo
.. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas
:alt: Try me on Runbot
:target: https://runbot.odoo-community.org/runbot/149/9.0
Known issues / Roadmap
======================
* Implement the backend application profile (client credentials grant type)
* Add checkboxes on the authorization page to allow the user to disable some scopes for a token ? (I don't know if this is allowed in the OAuth protocol)
Bug Tracker
===========
Bugs are tracked on `GitHub Issues
<https://github.com/OCA/server-tools/issues>`_. In case of trouble, please
check there if your issue has already been reported. If you spotted it first,
help us smashing it by providing a detailed and welcomed feedback.
Credits
=======
Images
------
* Odoo Community Association: `Icon <https://github.com/OCA/maintainer-tools/blob/master/template/module/static/description/icon.svg>`_.
Contributors
------------
* Sylvain Garancher <sylvain.garancher@syleam.fr>
Maintainer
----------
.. image:: https://odoo-community.org/logo.png
:alt: Odoo Community Association
:target: https://odoo-community.org
This module is maintained by the OCA.
OCA, or the Odoo Community Association, is a nonprofit organization whose
mission is to support the collaborative development of Odoo features and
promote its widespread use.
To contribute to this module, please visit https://odoo-community.org.

24
oauth_provider/__init__.py

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import controllers
from . import models
import uuid
def pre_init_hook(cr):
""" Initialize oauth_identifier on res.users
The standard initialization puts the same value for every existing record,
which is invalid for this field.
This is done in the pre_init_hook to be able to add the unique constrait
on the first run, when installing the module.
"""
cr.execute('ALTER TABLE res_users ADD COLUMN oauth_identifier varchar')
cr.execute('SELECT id FROM res_users')
for user_id in cr.fetchall():
cr.execute(
'UPDATE res_users SET oauth_identifier = %s WHERE id = %s',
(str(uuid.uuid4()), user_id))

28
oauth_provider/__openerp__.py

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
{
'name': 'OAuth Provider',
'summary': 'Allows to use Odoo as an OAuth2 provider',
'version': '9.0.1.0.0',
'category': 'Authentication',
'website': 'http://www.syleam.fr/',
'author': 'SYLEAM, Odoo Community Association (OCA)',
'license': 'AGPL-3',
'installable': True,
'external_dependancies': {
'python': ['oauthlib'],
},
'depends': [
'base',
],
'data': [
'security/oauth_provider_security.xml',
'security/ir.model.access.csv',
'views/oauth_provider_client.xml',
'views/oauth_provider_scope.xml',
'templates/authorization.xml',
],
'pre_init_hook': 'pre_init_hook',
}

5
oauth_provider/controllers/__init__.py

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import main

290
oauth_provider/controllers/main.py

@ -0,0 +1,290 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import json
import logging
import werkzeug.utils
import werkzeug.wrappers
from datetime import datetime
from openerp import http, fields
from openerp.addons.web.controllers.main import ensure_db
_logger = logging.getLogger(__name__)
try:
import oauthlib
from oauthlib import oauth2
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class OAuth2ProviderController(http.Controller):
def __init__(self):
super(OAuth2ProviderController, self).__init__()
def _get_request_information(self):
""" Retrieve needed arguments for oauthlib methods """
uri = http.request.httprequest.base_url
http_method = http.request.httprequest.method
body = oauthlib.common.urlencode(
http.request.httprequest.values.items())
headers = http.request.httprequest.headers
return uri, http_method, body, headers
def _check_access_token(self, access_token):
""" Check if the provided access token is valid """
token = http.request.env['oauth.provider.token'].search([
('token', '=', access_token),
])
if not token:
return False
oauth2_server = token.client_id.get_oauth2_server()
# Retrieve needed arguments for oauthlib methods
uri, http_method, body, headers = self._get_request_information()
# Validate request information
valid, oauthlib_request = oauth2_server.verify_request(
uri, http_method=http_method, body=body, headers=headers)
if valid:
return token
return False
def _json_response(self, data=None, status=200, headers=None):
""" Returns a json response to the client """
if headers is None:
headers = {'Content-Type': 'application/json'}
return werkzeug.wrappers.BaseResponse(
json.dumps(data), status=status, headers=headers)
@http.route('/oauth2/authorize', type='http', auth='user', methods=['GET'])
def authorize(self, client_id=None, response_type=None, redirect_uri=None,
scope=None, state=None, *args, **kwargs):
""" Check client's request, and display an authorization page to the user,
The authorization page lists allowed scopes
If the client is configured to skip the authorization page, directly
redirects to the requested URI
"""
client = http.request.env['oauth.provider.client'].search([
('identifier', '=', client_id),
])
if not client:
return http.request.render(
'oauth_provider.authorization_error', {
'title': 'Unknown Client Identifier!',
'message': 'This client identifier is invalid.',
})
oauth2_server = client.get_oauth2_server()
# Retrieve needed arguments for oauthlib methods
uri, http_method, body, headers = self._get_request_information()
try:
scopes, credentials = oauth2_server.validate_authorization_request(
uri, http_method=http_method, body=body, headers=headers)
# Store only some values, because the pickling of the full request
# object is not possible
http.request.httpsession['oauth_scopes'] = scopes
http.request.httpsession['oauth_credentials'] = {
'client_id': credentials['client_id'],
'redirect_uri': credentials['redirect_uri'],
'response_type': credentials['response_type'],
'state': credentials['state'],
}
if client.skip_authorization:
# Skip the authorization page
# Useful when the application is trusted
return self.authorize_post()
except oauth2.FatalClientError as e:
return http.request.render(
'oauth_provider.authorization_error', {
'title': 'Error: {error}'.format(error=e.error),
'message': e.description,
})
except oauth2.OAuth2Error as e:
return http.request.render(
'oauth_provider.authorization_error', {
'title': 'Error: {error}'.format(error=e.error),
'message': 'An unknown error occured! Please contact your '
'administrator',
})
oauth_scopes = client.scope_ids.filtered(
lambda record: record.code in scopes)
return http.request.render(
'oauth_provider.authorization', {
'oauth_client': client.name,
'oauth_scopes': oauth_scopes,
})
@http.route(
'/oauth2/authorize', type='http', auth='user', methods=['POST'])
def authorize_post(self, *args, **kwargs):
""" Redirect to the requested URI during the authorization """
client = http.request.env['oauth.provider.client'].search([
('identifier', '=', http.request.httpsession.get(
'oauth_credentials', {}).get('client_id'))])
if not client:
return http.request.render(
'oauth_provider.authorization_error', {
'title': 'Unknown Client Identifier!',
'message': 'This client identifier is invalid.',
})
oauth2_server = client.get_oauth2_server()
# Retrieve needed arguments for oauthlib methods
uri, http_method, body, headers = self._get_request_information()
scopes = http.request.httpsession['oauth_scopes']
credentials = http.request.httpsession['oauth_credentials']
headers, body, status = oauth2_server.create_authorization_response(
uri, http_method=http_method, body=body, headers=headers,
scopes=scopes, credentials=credentials)
return werkzeug.utils.redirect(headers['Location'], code=status)
@http.route('/oauth2/token', type='http', auth='none', methods=['POST'],
csrf=False)
def token(self, client_id=None, client_secret=None, redirect_uri=None,
scope=None, code=None, grant_type=None, username=None,
password=None, refresh_token=None, *args, **kwargs):
""" Return a token corresponding to the supplied information
Not all parameters are required, depending on the application type
"""
ensure_db()
# If no client_id is specified, get it from session
if client_id is None:
client_id = http.request.httpsession.get(
'oauth_credentials', {}).get('client_id')
client = http.request.env['oauth.provider.client'].sudo().search([
('identifier', '=', client_id),
])
if not client:
return self._json_response(
data={'error': 'invalid_client_id'}, status=401)
oauth2_server = client.get_oauth2_server()
# Retrieve needed arguments for oauthlib methods
uri, http_method, body, headers = self._get_request_information()
credentials = {'scope': scope}
# Retrieve the authorization code, if any, to get Odoo's user id
existing_code = http.request.env[
'oauth.provider.authorization.code'].search([
('client_id.identifier', '=', client_id),
('code', '=', code),
])
if existing_code:
credentials['odoo_user_id'] = existing_code.user_id.id
# Retrieve the existing token, if any, to get Odoo's user id
existing_token = http.request.env['oauth.provider.token'].search([
('client_id.identifier', '=', client_id),
('refresh_token', '=', refresh_token),
])
if existing_token:
credentials['odoo_user_id'] = existing_token.user_id.id
headers, body, status = oauth2_server.create_token_response(
uri, http_method=http_method, body=body, headers=headers,
credentials=credentials)
return werkzeug.wrappers.BaseResponse(
body, status=status, headers=headers)
@http.route('/oauth2/tokeninfo', type='http', auth='none', methods=['GET'])
def tokeninfo(self, access_token=None, *args, **kwargs):
""" Return some information about the supplied token
Similar to Google's "tokeninfo" request
"""
ensure_db()
token = self._check_access_token(access_token)
if not token:
return self._json_response(
data={'error': 'invalid_or_expired_token'}, status=401)
token_lifetime = (fields.Datetime.from_string(token.expires_at) -
datetime.now()).seconds
# Base data to return
data = {
'audience': token.client_id.identifier,
'scopes': ' '.join(token.scope_ids.mapped('code')),
'expires_in': token_lifetime,
}
# Add the oauth user identifier, if user's information access is
# allowed by the token's scopes
user_data = token.get_data_for_model(
'res.users', res_id=token.user_id.id)
if 'id' in user_data:
data.update(user_id=token.generate_user_id())
return self._json_response(data=data)
@http.route('/oauth2/userinfo', type='http', auth='none', methods=['GET'])
def userinfo(self, access_token=None, *args, **kwargs):
""" Return some information about the user linked to the supplied token
Similar to Google's "userinfo" request
"""
ensure_db()
token = self._check_access_token(access_token)
if not token:
return self._json_response(
data={'error': 'invalid_or_expired_token'}, status=401)
data = token.get_data_for_model('res.users', res_id=token.user_id.id)
return self._json_response(data=data)
@http.route('/oauth2/otherinfo', type='http', auth='none', methods=['GET'])
def otherinfo(self, access_token=None, model=None, *args, **kwargs):
""" Return allowed information about the requested model """
ensure_db()
token = self._check_access_token(access_token)
if not token:
return self._json_response(
data={'error': 'invalid_or_expired_token'}, status=401)
model_obj = http.request.env['ir.model'].search([
('model', '=', model),
])
if not model_obj:
return self._json_response(
data={'error': 'invalid_model'}, status=400)
data = token.get_data_for_model(model)
return self._json_response(data=data)
@http.route(
'/oauth2/revoke_token', type='http', auth='none', methods=['POST'])
def revoke_token(self, token=None, *args, **kwargs):
""" Revoke the supplied token """
ensure_db()
body = oauthlib.common.urlencode(
http.request.httprequest.values.items())
db_token = http.request.env['oauth.provider.token'].search([
('token', '=', token),
])
if not db_token:
db_token = http.request.env['oauth.provider.token'].search([
('refresh_token', '=', token),
])
if not db_token:
return self._json_response(
data={'error': 'invalid_or_expired_token'}, status=401)
oauth2_server = db_token.client_id.get_oauth2_server()
# Retrieve needed arguments for oauthlib methods
uri, http_method, body, headers = self._get_request_information()
headers, body, status = oauth2_server.create_revocation_response(
uri, http_method=http_method, body=body, headers=headers)
return werkzeug.wrappers.BaseResponse(
body, status=status, headers=headers)

10
oauth_provider/models/__init__.py

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import oauth_provider_authorization_code
from . import oauth_provider_redirect_uri
from . import oauth_provider_scope
from . import oauth_provider_token
from . import oauth_provider_client
from . import res_users

33
oauth_provider/models/oauth_provider_authorization_code.py

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from openerp import models, fields
class OAuthProviderAuthorizationCode(models.Model):
_name = 'oauth.provider.authorization.code'
_description = 'OAuth Provider Authorization Code'
_rec_name = 'code'
code = fields.Char(required=True, help='Name of the authorization code.')
client_id = fields.Many2one(
comodel_name='oauth.provider.client', string='Client', required=True,
help='Client associated to this authorization code.')
user_id = fields.Many2one(
comodel_name='res.users', string='User', required=True,
help='User associated to this authorization code.')
redirect_uri_id = fields.Many2one(
comodel_name='oauth.provider.redirect.uri', string='Redirect URI',
required=True,
help='Redirect URI associated to this authorization code.')
scope_ids = fields.Many2many(
comodel_name='oauth.provider.scope', string='Scopes',
help='Scopes allowed by this authorization code.')
active = fields.Boolean(
default=True, help='When unchecked, the code is invalidated.')
_sql_constraints = [
('code_client_id_unique', 'UNIQUE (code, client_id)',
'The authorization code must be unique per client !'),
]

130
oauth_provider/models/oauth_provider_client.py

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import hashlib
import uuid
import logging
from openerp import models, api, fields
from ..oauth2.validator import OdooValidator
_logger = logging.getLogger(__name__)
try:
from oauthlib import oauth2
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class OAuthProviderClient(models.Model):
_name = 'oauth.provider.client'
_description = 'OAuth Provider Client'
name = fields.Char(required=True, help='Name of this client.')
identifier = fields.Char(
string='Client Identifier', required=True, readonly=True,
default=lambda self: str(uuid.uuid4()), copy=False,
help='Unique identifier of the client.')
secret = fields.Char(
help='Optional secret used to authenticate the client.')
skip_authorization = fields.Boolean(
help='Check this box if the user shouldn\'t be prompted to authorize '
'or not the requested scopes.')
application_type = fields.Selection(
selection=[
('web application', 'Web Application'),
('mobile application', 'Mobile Application'),
('legacy application', 'Legacy Application'),
('backend application', 'Backend Application (not implemented)'),
], required=True, default='web application',
help='Application type to be used with this client.')
grant_type = fields.Selection(
selection=[
('authorization_code', 'Authorization Code'),
('implicit', 'Implicit'),
('password', 'Password'),
('client_credentials', 'Client Credentials'),
], string='OAuth Grant Type',
compute='_compute_grant_response_type', store=True,
help='Grant type used by the client for OAuth.')
response_type = fields.Selection(
selection=[
('code', 'Authorization Code'),
('token', 'Token'),
('none', 'None'),
], string='OAuth Response Type',
compute='_compute_grant_response_type', store=True,
help='Response type used by the client for OAuth.')
token_type = fields.Selection(
selection=[('random', 'Randomly generated')],
required=True, default='random',
help='Type of token to return. The base module only provides randomly '
'generated tokens.')
scope_ids = fields.Many2many(
comodel_name='oauth.provider.scope', string='Allowed Scopes',
help='List of scopes the client is allowed to access.')
redirect_uri_ids = fields.One2many(
comodel_name='oauth.provider.redirect.uri', inverse_name='client_id',
string='OAuth Redirect URIs',
help='Allowed redirect URIs for the client.')
_sql_constraints = [
('identifier_unique', 'UNIQUE (identifier)',
'The identifier of the client must be unique !'),
]
@api.model
def application_type_mapping(self):
return {
'web application': ('authorization_code', 'code'),
'mobile application': ('implicit', 'token'),
'legacy application': ('password', 'none'),
'backend application': ('client_credentials', 'none'),
}
@api.multi
@api.depends('application_type')
def _compute_grant_response_type(self):
applications = self.application_type_mapping()
for client in self:
client.grant_type, client.response_type = applications[
client.application_type]
@api.multi
def get_oauth2_server(self, validator=None, **kwargs):
""" Returns an OAuth2 server instance, depending on the client application type
Generates an OdooValidator instance if no custom validator is defined
All other arguments are directly passed to the server constructor (for
example, a token generator function)
"""
self.ensure_one()
if validator is None:
validator = OdooValidator()
if self.application_type == 'web application':
return oauth2.WebApplicationServer(validator, **kwargs)
elif self.application_type == 'mobile application':
return oauth2.MobileApplicationServer(validator, **kwargs)
elif self.application_type == 'legacy application':
return oauth2.LegacyApplicationServer(validator, **kwargs)
elif self.application_type == 'backend application':
return oauth2.BackendApplicationServer(validator, **kwargs)
@api.multi
def generate_user_id(self, user):
""" Generates a unique user identifier for this client
Include the client and user identifiers in the final identifier to
generate a different identifier for the same user, depending on the
client accessing this user. By doing this, clients cannot find a list
of common users by comparing their identifiers list from tokeninfo.
"""
self.ensure_one()
user_identifier = self.identifier \
+ user.sudo().oauth_identifier
# Use a sha256 to avoid a too long final string
return hashlib.sha256(user_identifier).hexdigest()

17
oauth_provider/models/oauth_provider_redirect_uri.py

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from openerp import models, fields
class OAuthProviderRedirectURI(models.Model):
_name = 'oauth.provider.redirect.uri'
_description = 'OAuth Provider Redirect URI'
name = fields.Char(required=True, help='URI of the redirect.')
sequence = fields.Integer(
required=True, default=10, help='Order of the redirect URIs.')
client_id = fields.Many2one(
comodel_name='oauth.provider.client', string='Client', required=True,
help='Client allowed to redirect using this URI.')

102
oauth_provider/models/oauth_provider_scope.py

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import datetime
import dateutil
import time
from collections import defaultdict
from openerp import models, api, fields
from openerp.tools.safe_eval import safe_eval
class OAuthProviderScope(models.Model):
_name = 'oauth.provider.scope'
_description = 'OAuth Provider Scope'
name = fields.Char(
required=True, translate=True,
help='Name of the scope, displayed to the user.')
code = fields.Char(
required=True, help='Code of the scope, used in OAuth requests.')
description = fields.Text(
required=True, translate=True,
help='Description of the scope, displayed to the user.')
model_id = fields.Many2one(
comodel_name='ir.model', string='Model', required=True,
help='Model allowed to be accessed by this scope.')
model = fields.Char(
related='model_id.model', string='Model Name', readonly=True,
help='Name of the model allowed to be accessed by this scope.')
filter_id = fields.Many2one(
comodel_name='ir.filters', string='Filter',
domain="[('model_id', '=', model)]",
help='Filter applied to retrieve records allowed by this scope.')
field_ids = fields.Many2many(
comodel_name='ir.model.fields', string='Fields',
domain="[('model_id', '=', model_id)]",
help='Fields allowed by this scope.')
_sql_constraints = [
('code_unique', 'UNIQUE (code)',
'The code of the scopes must be unique !'),
]
@api.model
def _get_ir_filter_eval_context(self):
""" Returns the base eval context for ir.filter domains evaluation """
return {
'datetime': datetime,
'dateutil': dateutil,
'time': time,
'uid': self.env.uid,
'user': self.env.user,
}
@api.multi
def get_data_for_model(self, model, res_id=None, all_scopes_match=False):
""" Return the data matching the scopes from the requested model """
data = defaultdict(dict)
eval_context = self._get_ir_filter_eval_context()
all_scopes_records = self.env[model]
for scope in self.filtered(lambda record: record.model == model):
# Retrieve the scope's domain
filter_domain = [(1, '=', 1)]
if scope.filter_id:
filter_domain = safe_eval(
scope.filter_id.sudo().domain, eval_context)
if res_id is not None:
filter_domain.append(('id', '=', res_id))
# Retrieve data of the matching records, depending on the scope's
# fields
records = self.env[model].search(filter_domain)
for record_data in records.read(scope.field_ids.mapped('name')):
for field, value in record_data.items():
if isinstance(value, tuple):
# Return only the name for a many2one
data[record_data['id']][field] = value[1]
else:
data[record_data['id']][field] = value
# Keep a list of records that match all scopes
if not all_scopes_records:
all_scopes_records = records
else:
all_scopes_records &= records
# If all scopes are required to match, filter the results to keep only
# those mathing all scopes
if all_scopes_match:
data = dict(filter(
lambda record_data: record_data[0] in all_scopes_records.ids,
data.items()))
# If a single record was requested, return only data coming from this
# record
# Return an empty dictionnary if this record didn't recieve data to
# return
if res_id is not None:
data = data.get(res_id, {})
return data

99
oauth_provider/models/oauth_provider_token.py

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from openerp import models, api, fields, exceptions, _
class OAuthProviderToken(models.Model):
_name = 'oauth.provider.token'
_description = 'OAuth Provider Token'
_rec_name = 'token'
token = fields.Char(required=True, help='The token itself.')
token_type = fields.Selection(
selection=[('Bearer', 'Bearer')], required=True, default='Bearer',
help='Type of token stored. Currently, only the bearer token type is '
'available.')
refresh_token = fields.Char(
help='The refresh token, if applicable.')
client_id = fields.Many2one(
comodel_name='oauth.provider.client', string='Client', required=True,
help='Client associated to this token.')
user_id = fields.Many2one(
comodel_name='res.users', string='User', required=True,
help='User associated to this token.')
scope_ids = fields.Many2many(
comodel_name='oauth.provider.scope', string='Scopes',
help='Scopes allowed by this token.')
expires_at = fields.Datetime(
required=True, help='Expiration time of the token.')
active = fields.Boolean(
compute='_compute_active', search='_search_active',
help='A token is active only if it has not yet expired.')
_sql_constraints = [
('token_unique', 'UNIQUE (token, client_id)',
'The token must be unique per client !'),
('refresh_token_unique', 'UNIQUE (refresh_token, client_id)',
'The refresh token must be unique per client !'),
]
@api.multi
def _compute_active(self):
for token in self:
token.active = fields.Datetime.now() < token.expires_at
@api.model
def _search_active(self, operator, operand):
domain = []
if operator == 'in':
if True in operand:
domain += self._search_active('=', True)
if False in operand:
domain += self._search_active('=', False)
if len(domain) > 1:
domain = [(1, '=', 1)]
elif operator == 'not in':
if True in operand:
domain += self._search_active('!=', True)
if False in operand:
domain += self._search_active('!=', False)
if len(domain) > 1:
domain = [(0, '=', 1)]
elif operator in ('=', '!='):
operators = {
('=', True): '>',
('=', False): '<=',
('!=', False): '>',
('!=', True): '<=',
}
domain = [('expires_at', operators[operator, operand],
fields.Datetime.now())]
else:
raise exceptions.UserError(
_('Invalid operator {operator} for field active!').format(
operator=operator))
return domain
@api.multi
def generate_user_id(self):
""" Generates a unique user identifier for this token """
self.ensure_one()
return self.client_id.generate_user_id(self.user_id)
@api.multi
def get_data_for_model(self, model, res_id=None, all_scopes_match=False):
""" Returns the data of the accessible records of the requested model,
Data are returned depending on the allowed scopes for the token
If the all_scopes_match argument is set to True, return only records
allowed by all token's scopes
"""
self.ensure_one()
# Retrieve records allowed from all scopes
return self.sudo(user=self.user_id).scope_ids.get_data_for_model(
model, res_id=res_id, all_scopes_match=all_scopes_match)

20
oauth_provider/models/res_users.py

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import uuid
from openerp import models, fields
class ResUsers(models.Model):
_inherit = 'res.users'
oauth_identifier = fields.Char(
string='OAuth Identifier', required=True, readonly=True,
default=lambda self: str(uuid.uuid4()), copy=False,
help='String used to identify this user during an OAuth session.')
_sql_constraints = [
('oauth_identifier_unique', 'UNIQUE (oauth_identifier)',
'The OAuth identifier of the user must be unique !'),
]

0
oauth_provider/oauth2/__init__.py

256
oauth_provider/oauth2/validator.py

@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import base64
import logging
from datetime import datetime, timedelta
from openerp import http
from openerp import fields
_logger = logging.getLogger(__name__)
try:
from oauthlib.oauth2 import RequestValidator
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class OdooValidator(RequestValidator):
""" OAuth2 validator to be used in Odoo
This is an implementation of oauthlib's RequestValidator interface
https://github.com/idan/oauthlib/oauthlib/oauth2/rfc6749/request_validator.py
"""
def _load_client(self, request, client_id=None):
""" Returns a client instance for the request """
client = request.client
if not client:
request.client = http.request.env['oauth.provider.client'].search([
('identifier', '=', client_id or request.client_id),
])
request.odoo_user = http.request.env.user
request.client.client_id = request.client.identifier
def _extract_auth(self, request):
""" Extract auth string from request headers """
auth = request.headers.get('Authorization', ' ')
auth_type, auth_string = auth.split(' ', 1)
if auth_type != 'Basic':
return ''
return auth_string
def authenticate_client(self, request, *args, **kwargs):
""" Authenticate the client """
auth_string = self._extract_auth(request)
auth_string_decoded = base64.b64decode(auth_string)
# If we don't have a proper auth string, get values in the request body
if ':' not in auth_string_decoded:
client_id = request.client_id
client_secret = request.client_secret
else:
client_id, client_secret = auth_string_decoded.split(':', 1)
self._load_client(request)
return (request.client.identifier == client_id) and \
(request.client.secret or '') == (client_secret or '')
def authenticate_client_id(self, client_id, request, *args, **kwargs):
""" Ensure client_id belong to a non-confidential client """
self._load_client(request, client_id=client_id)
return bool(request.client) and not request.client.secret
def client_authentication_required(self, request, *args, **kwargs):
""" Determine if the client authentication is required for the request
"""
# If an auth string was specified, unconditionnally authenticate
if self._extract_auth(request):
return True
self._load_client(request)
return request.client.grant_type in (
'password',
'authorization_code',
'refresh_token',
) or request.client_secret or \
not request.odoo_user.active
def confirm_redirect_uri(
self, client_id, code, redirect_uri, client, *args, **kwargs):
""" Ensure that the authorization process' redirect URI
The authorization process corresponding to the code must begin by using
this redirect_uri
"""
code = http.request.env['oauth.provider.authorization.code'].search([
('client_id.identifier', '=', client_id),
('code', '=', code),
])
return redirect_uri == code.redirect_uri_id.name
def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
""" Returns the default redirect URI for the client """
client = http.request.env['oauth.provider.client'].search([
('identifier', '=', client_id),
])
return client.redirect_uri_ids and client.redirect_uri_ids[0].name \
or ''
def get_default_scopes(self, client_id, request, *args, **kwargs):
""" Returns a list of default scoprs for the client """
client = http.request.env['oauth.provider.client'].search([
('identifier', '=', client_id),
])
return ' '.join(client.scope_ids.mapped('code'))
def get_original_scopes(self, refresh_token, request, *args, **kwargs):
""" Returns the list of scopes associated to the refresh token """
token = http.request.env['oauth.provider.token'].search([
('client_id', '=', request.client.id),
('refresh_token', '=', refresh_token),
])
return token.scope_ids.mapped('code')
def invalidate_authorization_code(
self, client_id, code, request, *args, **kwargs):
""" Invalidates an authorization code """
code = http.request.env['oauth.provider.authorization.code'].search([
('client_id.identifier', '=', client_id),
('code', '=', code),
])
code.sudo().write({'active': False})
def is_within_original_scope(
self, request_scopes, refresh_token, request, *args, **kwargs):
""" Check if the requested scopes are within a scope of the token """
token = http.request.env['oauth.provider.token'].search([
('client_id', '=', request.client.id),
('refresh_token', '=', refresh_token),
])
return set(request_scopes).issubset(
set(token.scope_ids.mapped('code')))
def revoke_token(self, token, token_type_hint, request, *args, **kwargs):
""" Revoke an access of refresh token """
db_token = http.request.env['oauth.provider.token'].search([
('token', '=', token),
])
# If we revoke a full token, simply unlink it
if db_token:
db_token.sudo().unlink()
# If we revoke a refresh token, empty it in the corresponding token
else:
db_token = http.request.env['oauth.provider.token'].search([
('refresh_token', '=', token),
])
db_token.sudo().refresh_token = False
def rotate_refresh_token(self, request):
""" Determine if the refresh token has to be renewed
Called after refreshing an access token
Always refresh the token by default, but child classes could override
this method to change this behaviour.
"""
return True
def save_authorization_code(
self, client_id, code, request, *args, **kwargs):
""" Store the authorization code into the database """
redirect_uri = http.request.env['oauth.provider.redirect.uri'].search([
('name', '=', request.redirect_uri),
])
http.request.env['oauth.provider.authorization.code'].sudo().create({
'code': code['code'],
'client_id': request.client.id,
'user_id': request.odoo_user.id,
'redirect_uri_id': redirect_uri.id,
'scope_ids': [(6, 0, request.client.scope_ids.filtered(
lambda record: record.code in request.scopes).ids)],
})
def save_bearer_token(self, token, request, *args, **kwargs):
""" Store the bearer token into the database """
scopes = token.get('scope', '').split()
http.request.env['oauth.provider.token'].sudo().create({
'token': token['access_token'],
'token_type': token['token_type'],
'refresh_token': token.get('refresh_token'),
'client_id': request.client.id,
'user_id': token.get('odoo_user_id', request.odoo_user.id),
'scope_ids': [(6, 0, request.client.scope_ids.filtered(
lambda record: record.code in scopes).ids)],
'expires_at': fields.Datetime.to_string(
datetime.now() + timedelta(seconds=token['expires_in'])),
})
return request.client.redirect_uri_ids[0].name
def validate_bearer_token(self, token, scopes, request):
""" Ensure the supplied bearer token is valid, and allowed for the scopes
"""
token = http.request.env['oauth.provider.token'].search([
('token', '=', token),
])
if scopes is None:
scopes = ''
return set(scopes.split()).issubset(
set(token.scope_ids.mapped('code')))
def validate_client_id(self, client_id, request, *args, **kwargs):
""" Ensure client_id belong to a valid and active client """
self._load_client(request)
return bool(request.client)
def validate_code(self, client_id, code, client, request, *args, **kwargs):
""" Check that the code is valid, and assigned to the given client """
code = http.request.env['oauth.provider.authorization.code'].search([
('client_id.identifier', '=', client_id),
('code', '=', code),
])
request.odoo_user = code.user_id
return bool(code)
def validate_grant_type(
self, client_id, grant_type, client, request, *args, **kwargs):
""" Ensure the client is authorized to use the requested grant_type """
return client.identifier == client_id and grant_type in (
client.grant_type, 'refresh_token'
)
def validate_redirect_uri(
self, client_id, redirect_uri, request, *args, **kwargs):
""" Ensure the client is allowed to use the requested redurect_uri """
return request.client.identifier == client_id and \
redirect_uri in request.client.mapped('redirect_uri_ids.name')
def validate_refresh_token(
self, refresh_token, client, request, *args, **kwargs):
""" Ensure the refresh token is valid and associated to the client """
token = http.request.env['oauth.provider.token'].search([
('client_id', '=', client.id),
('refresh_token', '=', refresh_token),
])
return bool(token)
def validate_response_type(
self, client_id, response_type, client, request, *args, **kwargs):
""" Ensure the client is allowed to use the requested response_type """
return request.client.identifier == client_id and \
response_type == request.client.response_type
def validate_scopes(
self, client_id, scopes, client, request, *args, **kwargs):
""" Ensure the client is allowed to access all requested scopes """
return request.client.identifier == client_id and set(scopes).issubset(
set(request.client.mapped('scope_ids.code')))
def validate_user(
self, username, password, client, request, *args, **kwargs):
""" Ensure the usernamd and password are valid """
uid = http.request.session.authenticate(
http.request.session.db, username, password)
request.odoo_user = http.request.env['res.users'].browse(uid)
return bool(uid)

9
oauth_provider/security/ir.model.access.csv

@ -0,0 +1,9 @@
"id","name","model_id:id","group_id:id","perm_read","perm_write","perm_create","perm_unlink"
"oauth_provider_client_all_users","OAuth Client All Users","model_oauth_provider_client",,1,0,0,0
"oauth_provider_client_manager","OAuth Client Manager","model_oauth_provider_client",group_oauth_provider_manager,1,1,1,1
"oauth_provider_scope_all_users","OAuth Scope All Users","model_oauth_provider_scope",,1,0,0,0
"oauth_provider_scope_manager","OAuth Scope Manager","model_oauth_provider_scope",group_oauth_provider_manager,1,1,1,1
"oauth_provider_redirect_uri_all_users","OAuth Redirect URI All Users","model_oauth_provider_redirect_uri",,1,0,0,0
"oauth_provider_redirect_uri_manager","OAuth Redirect URI Manager","model_oauth_provider_redirect_uri",group_oauth_provider_manager,1,1,1,1
"oauth_provider_authorization_code_all_users","OAuth Authorization Code All Users","model_oauth_provider_authorization_code",,1,0,0,0
"oauth_provider_token_all_users","OAuth Token All Users","model_oauth_provider_token",,1,0,0,0

32
oauth_provider/security/oauth_provider_security.xml

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 SYLEAM
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
-->
<odoo>
<record id="group_oauth_provider_manager" model="res.groups">
<field name="name">OAuth Provider Manager</field>
</record>
<record id="ir_rule_authorization_code_restricted_to_current_user" model="ir.rule">
<field name="name">Authorization Code access restricted to current user</field>
<field name="domain_force">[('user_id', '=', uid)]</field>
<field name="groups" eval="[]"/>
<field name="model_id" ref="model_oauth_provider_authorization_code"/>
<field name="perm_create" eval="1"/>
<field name="perm_read" eval="0"/>
<field name="perm_unlink" eval="0"/>
<field name="perm_write" eval="0"/>
</record>
<record id="ir_rule_token_restricted_to_current_user" model="ir.rule">
<field name="name">Token access restricted to current user</field>
<field name="domain_force">[('user_id', '=', uid)]</field>
<field name="groups" eval="[]"/>
<field name="model_id" ref="model_oauth_provider_token"/>
<field name="perm_create" eval="1"/>
<field name="perm_read" eval="0"/>
<field name="perm_unlink" eval="0"/>
<field name="perm_write" eval="0"/>
</record>
</odoo>

39
oauth_provider/templates/authorization.xml

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
Copyright 2016 SYLEAM
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
-->
<odoo>
<template id="oauth_provider.authorization" name="OAuth Authorization Error">
<t t-set="disable_footer" t-value="True"/>
<t t-call="web.login_layout">
<form class="oe_login_form" role="form" method="post" action="/oauth2/authorize">
<input type="hidden" name="csrf_token" t-att-value="request.csrf_token()"/>
<div>
<h3 t-esc="oauth_client"/>
This application would like to access these resources :
</div>
<div class="list-group">
<span t-foreach="oauth_scopes" t-as="scope" class="list-group-item">
<h4 t-field="scope.name" class="list-group-item-heading"/>
<p t-field="scope.description" class="list-group-item-text"/>
</span>
</div>
<div class="clearfix oe_login_buttons text-center">
<button type="submit" class="btn btn-primary">Authorize</button>
</div>
</form>
</t>
</template>
<template id="oauth_provider.authorization_error" name="OAuth Authorization Error">
<t t-set="disable_footer" t-value="True"/>
<t t-call="web.login_layout">
<div class="panel panel-danger">
<div class="panel-heading">
<h3 t-esc="title"/>
</div>
<div class="panel-body" t-esc="message"/>
</div>
</t>
</template>
</odoo>

10
oauth_provider/tests/__init__.py

@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import test_oauth_provider_client
from . import test_oauth_provider_token
from . import test_oauth_provider_scope
from . import test_oauth_provider_controller_web_application
from . import test_oauth_provider_controller_mobile_application
from . import test_oauth_provider_controller_legacy_application

121
oauth_provider/tests/common_test_controller.py

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import mock
import logging
from datetime import datetime, timedelta
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse
from openerp import fields
from openerp.service import wsgi_server
from openerp.tests.common import TransactionCase
from openerp.tools.misc import consteq
_logger = logging.getLogger(__name__)
class OAuthProviderControllerTransactionCase(TransactionCase):
def setUp(self, application_type):
super(OAuthProviderControllerTransactionCase, self).setUp()
# Initialize controller test stuff
self.werkzeug_environ = {
'REMOTE_ADDR': '127.0.0.1',
}
self.user = self.env.ref('base.user_demo')
self.logged_user = False
self.initialize_test_client()
# Initialize common stuff
self.redirect_uri_base = 'http://example.com'
self.filter = self.env['ir.filters'].create({
'name': 'Current user',
'model_id': 'res.users',
'domain': "[('id', '=', uid)]",
})
self.client = self.env['oauth.provider.client'].create({
'name': 'Client',
'identifier': 'client',
'application_type': application_type,
'redirect_uri_ids': [(0, 0, {'name': self.redirect_uri_base})],
'scope_ids': [(0, 0, {
'name': 'Email',
'code': 'email',
'description': 'Access to your email address.',
'model_id': self.env.ref('base.model_res_users').id,
'filter_id': self.filter.id,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_users_email').id]),
],
}), (0, 0, {
'name': 'Profile',
'code': 'profile',
'description': 'Access to your profile details (name, etc.)',
'model_id': self.env.ref('base.model_res_users').id,
'filter_id': self.filter.id,
'field_ids': [
(6, 0, [
self.env.ref('base.field_res_users_name').id,
self.env.ref('base.field_res_users_city').id,
]),
],
})],
})
def initialize_test_client(self):
# Instantiate a test client
self.test_client = Client(wsgi_server.application, BaseResponse)
# Select the database
self.get_request('/web', data={'db': self.env.cr.dbname})
def login(self, username, password):
# Login as demo user
self.post_request('/web/login', data={
'login': username,
'password': password,
})
self.logged_user = self.env['res.users'].search([
('login', '=', username)])
def logout(self):
# Login as demo user
self.get_request('/web/session/logout')
self.logged_user = False
@mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock)
def get_request(self, uri, request_env, data=None, headers=None):
""" Execute a GET request on the test client """
# Mock the http request's environ to allow it to see test records
user = self.logged_user or self.env.ref('base.public_user')
request_env.return_value = self.env(user=user)
return self.test_client.get(
uri, query_string=data, environ_base=self.werkzeug_environ,
headers=headers)
@mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock)
@mock.patch('openerp.http.WebRequest.validate_csrf')
def post_request(
self, uri, validate_csrf, request_env, data=None, headers=None):
""" Execute a POST request on the test client """
# Mock the http request's environ to allow it to see test records
user = self.logged_user or self.env.ref('base.public_user')
request_env.return_value = self.env(user=user)
# Disable CSRF tokens check during tests
validate_csrf.return_value = consteq('', '')
return self.test_client.post(
uri, data=data, environ_base=self.werkzeug_environ,
headers=headers)
def new_token(self):
return self.env['oauth.provider.token'].create({
'token': 'token',
'token_type': 'Bearer',
'refresh_token': 'refresh token',
'client_id': self.client.id,
'user_id': self.user.id,
'expires_at': fields.Datetime.to_string(
datetime.now() + timedelta(seconds=3600)),
})

419
oauth_provider/tests/common_test_oauth_provider_controller.py

@ -0,0 +1,419 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import hashlib
import json
import mock
import logging
from datetime import datetime
from openerp import fields
_logger = logging.getLogger(__name__)
class TestOAuthProviderAurhorizeController(object):
def test_authorize_error_missing_arguments(self):
""" Call /oauth2/authorize without any argument
Must return an unknown client identifier error
"""
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize')
self.assertEqual(response.status_code, 200)
self.assertTrue('Unknown Client Identifier!' in response.data)
self.assertTrue('This client identifier is invalid.' in response.data)
def test_authorize_error_invalid_request(self):
""" Call /oauth2/authorize with only the client_id argument
Must return an invalid_request error
"""
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
})
self.assertEqual(response.status_code, 200)
self.assertTrue('Error: invalid_request' in response.data)
self.assertTrue('An unknown error occured! Please contact your '
'administrator' in response.data)
def test_authorize_error_unsupported_response_type(self):
""" Call /oauth2/authorize with an unsupported response type
Must return an unsupported_response_type error
"""
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': 'wrong response type',
})
self.assertEqual(response.status_code, 200)
self.assertTrue('Error: unsupported_response_type' in response.data)
self.assertTrue('An unknown error occured! Please contact your '
'administrator' in response.data)
def test_authorize_error_wrong_scopes(self):
""" Call /oauth2/authorize with wrong scopes
Must return an invalid_scope error
"""
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'scope': 'wrong scope',
})
self.assertEqual(response.status_code, 200)
self.assertTrue('Error: invalid_scope' in response.data)
self.assertTrue('An unknown error occured! Please contact your '
'administrator' in response.data)
def test_authorize_error_wrong_uri(self):
""" Call the authorize method with a wrong redirect_uri
Must return an invalid_request error
"""
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': 'http://wrong.redirect.uri',
})
self.assertEqual(response.status_code, 200)
self.assertTrue('Error: invalid_request' in response.data)
self.assertTrue('Mismatching redirect URI' in response.data)
def test_authorize_error_missing_uri(self):
""" Call /oauth2/authorize without any configured redirect URI
Must return an invalid_request error
"""
self.client.redirect_uri_ids.unlink()
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'scope': self.client.scope_ids[0].code,
})
self.assertEqual(response.status_code, 200)
self.assertTrue('Error: invalid_request' in response.data)
self.assertTrue('Missing redirect URI.' in response.data)
def test_authorize_post_errors(self):
""" Call /oauth2/authorize in POST without any session
Must return an unknown client identifier error
"""
self.login('demo', 'demo')
response = self.post_request('/oauth2/authorize')
self.assertEqual(response.status_code, 200)
self.assertTrue('Unknown Client Identifier!' in response.data)
self.assertTrue('This client identifier is invalid.' in response.data)
@mock.patch('openerp.http.WebRequest.env', new_callable=mock.PropertyMock)
def test_authorize_unsafe_chars(self, request_env):
""" Call /oauth2/authorize with unsafe chars in the query string """
# Mock the http request's environ to allow it to see test records
request_env.return_value = self.env(user=self.user)
query_string = 'client_id=%s&response_type=%s&state={}' % (
self.client.identifier,
self.client.response_type,
)
self.login('demo', 'demo')
response = self.test_client.get(
'/oauth2/authorize', query_string=query_string,
environ_base=self.werkzeug_environ)
self.assertEqual(response.status_code, 200)
self.assertTrue(self.client.name in response.data)
class TestOAuthProviderRefreshTokenController(object):
def test_refresh_token_error_too_much_scopes(self):
""" Call /oauth2/token using a refresh token, with too much scopes """
token = self.new_token()
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids.mapped('code'),
'grant_type': 'refresh_token',
'refresh_token': token.refresh_token,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'invalid_scope'})
def test_refresh_token(self):
""" Get a new token using the refresh token """
token = self.new_token()
token.scope_ids = self.client.scope_ids[0]
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': ' '.join(token.scope_ids.mapped('code')),
'grant_type': 'refresh_token',
'refresh_token': token.refresh_token,
})
response_data = json.loads(response.data)
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
new_token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id)
], order='id DESC', limit=1)
self.assertEqual(response.status_code, 200)
self.assertEqual(new_token.token, response_data['access_token'])
self.assertEqual(new_token.token_type, response_data['token_type'])
self.assertEqual(
new_token.refresh_token, response_data['refresh_token'])
self.assertEqual(new_token.scope_ids, token.scope_ids)
self.assertEqual(new_token.user_id, self.user)
class TestOAuthProviderTokeninfoController(object):
def test_tokeninfo_error_missing_arguments(self):
""" Call /oauth2/tokeninfo without any argument
Must retun an invalid_or_expired_token error
"""
response = self.get_request('/oauth2/tokeninfo')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_or_expired_token'})
def test_tokeninfo(self):
""" Retrieve token information """
token = self.new_token()
token.scope_ids = self.client.scope_ids[0]
response = self.get_request('/oauth2/tokeninfo', data={
'access_token': token.token,
})
token_lifetime = (fields.Datetime.from_string(token.expires_at) -
datetime.now()).seconds
response_data = json.loads(response.data)
self.assertEqual(response.status_code, 200)
self.assertEqual(
response_data['audience'], token.client_id.identifier)
self.assertEqual(
response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
# Test a range because the test might not be accurate, depending on the
# test system load
self.assertTrue(
token_lifetime - 5 < response_data['expires_in'] <
token_lifetime + 5)
self.assertEqual(
response_data['user_id'],
hashlib.sha256(token.client_id.identifier +
token.user_id.oauth_identifier).hexdigest())
def test_tokeninfo_without_scopes(self):
""" Call /oauth2/tokeninfo without any scope
Retrieve token information without any scopes on the token
The user_id field should not be included
"""
token = self.new_token()
token.scope_ids = self.env['oauth.provider.scope']
response = self.get_request('/oauth2/tokeninfo', data={
'access_token': token.token,
})
token_lifetime = (fields.Datetime.from_string(token.expires_at) -
datetime.now()).seconds
response_data = json.loads(response.data)
self.assertEqual(response.status_code, 200)
self.assertEqual(
response_data['audience'], token.client_id.identifier)
self.assertEqual(
response_data['scopes'], ' '.join(token.scope_ids.mapped('code')))
# Test a range because the test might not be accurate, depending on the
# test system load
self.assertTrue(
token_lifetime - 5 < response_data['expires_in'] <
token_lifetime + 5)
class TestOAuthProviderUserinfoController(object):
def test_userinfo_error_missing_arguments(self):
""" Call /oauth2/userinfo without any argument
Must return an invalid_or_expired_token error
"""
response = self.get_request('/oauth2/userinfo')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_or_expired_token'})
def test_userinfo_single_scope(self):
""" Retrieve user information with only a single scope """
token = self.new_token()
token.scope_ids = self.client.scope_ids[0]
# Retrieve user information
response = self.get_request('/oauth2/userinfo', data={
'access_token': token.token,
})
self.assertEqual(response.status_code, 200)
self.assertEqual(json.loads(response.data), {
'id': self.user.id,
'email': self.user.email,
})
def test_userinfo_multiple_scope(self):
""" Retrieve user information with only a all test scopes """
token = self.new_token()
token.scope_ids = self.client.scope_ids
# Retrieve user information
response = self.get_request('/oauth2/userinfo', data={
'access_token': token.token,
})
# The Email scope allows to read the email
# The Profile scope allows to read the name and city
# The id of the recod is always added (standard Odoo behaviour)
self.assertEqual(response.status_code, 200)
self.assertEqual(json.loads(response.data), {
'id': self.user.id,
'name': self.user.name,
'email': self.user.email,
'city': self.user.city,
})
class TestOAuthProviderOtherinfoController(object):
def test_otherinfo_error_missing_arguments(self):
""" Call /oauth2/otherinfo method without any argument
Must return an invalid_or_expired_token error
"""
response = self.get_request('/oauth2/otherinfo')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_or_expired_token'})
def test_otherinfo_error_missing_model(self):
""" Call /oauth2/otherinfo method without the model argument
Must return an invalid_model error
"""
token = self.new_token()
response = self.get_request(
'/oauth2/otherinfo', data={'access_token': token.token})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
def test_otherinfo_error_invalid_model(self):
""" Call /oauth2/otherinfo method withan invalid model
Must return an invalid_model error
"""
token = self.new_token()
response = self.get_request(
'/oauth2/otherinfo',
data={'access_token': token.token, 'model': 'invalid.model'})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {'error': 'invalid_model'})
def test_otherinfo_user_information(self):
""" Call /oauth2/otherinfo to retrieve information using the token """
token = self.new_token()
token.scope_ids = self.client.scope_ids
# Add a new scope to test informations retrieval
token.scope_ids += self.env['oauth.provider.scope'].create({
'name': 'Groups',
'code': 'groups',
'description': 'List of accessible groups',
'model_id': self.env.ref('base.model_res_groups').id,
'filter_id': False,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
],
})
# Retrieve user information
response = self.get_request('/oauth2/otherinfo', data={
'access_token': token.token,
'model': 'res.users',
})
# The Email scope allows to read the email
# The Profile scope allows to read the name and city
# The id of the recod is always added (standard Odoo behaviour)
self.assertEqual(response.status_code, 200)
self.assertEqual(json.loads(response.data), {str(self.user.id): {
'id': self.user.id,
'name': self.user.name,
'email': self.user.email,
'city': self.user.city,
}})
def test_otherinfo_group_information(self):
""" Call /oauth2/otherinfo to retrieve information using the token """
token = self.new_token()
token.scope_ids = self.client.scope_ids
# Add a new scope to test informations retrieval
token.scope_ids += self.env['oauth.provider.scope'].create({
'name': 'Groups',
'code': 'groups',
'description': 'List of accessible groups',
'model_id': self.env.ref('base.model_res_groups').id,
'filter_id': False,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
],
})
# Retrieve groups information
all_groups = self.env['res.groups'].search([])
response = self.get_request('/oauth2/otherinfo', data={
'access_token': token.token,
'model': 'res.groups',
})
self.assertEqual(response.status_code, 200)
self.assertEqual(
sorted(json.loads(response.data).keys()),
sorted(map(str, all_groups.ids)))
class TestOAuthProviderRevokeTokenController(object):
def test_revoke_token_error_missing_arguments(self):
""" Call /oauth2/revoke_token method without any argument """
response = self.post_request('/oauth2/revoke_token')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_or_expired_token'})
def test_revoke_token_error_missing_client_id(self):
""" Call /oauth2/revoke_token method without client identifier """
token = self.new_token()
response = self.post_request('/oauth2/revoke_token', data={
'token': token.token,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client'})
def test_revoke_token_error_missing_token(self):
""" Call /oauth2/revoke_token method without token """
response = self.post_request('/oauth2/revoke_token', data={
'client_id': self.client.identifier,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_or_expired_token'})
def test_revoke_access_token(self):
""" Revoke an access token """
token = self.new_token()
self.post_request('/oauth2/revoke_token', data={
'client_id': self.client.identifier,
'token': token.token,
})
self.assertFalse(token.exists())
def test_revoke_refresh_token(self):
""" Revoke a refresh token """
token = self.new_token()
self.post_request('/oauth2/revoke_token', data={
'client_id': self.client.identifier,
'token': token.refresh_token,
})
self.assertTrue(token.exists())
self.assertFalse(token.refresh_token)

128
oauth_provider/tests/test_oauth_provider_client.py

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
from openerp.tests.common import TransactionCase
from ..oauth2.validator import OdooValidator
_logger = logging.getLogger(__name__)
try:
from oauthlib import oauth2
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class TestOAuthProviderClient(TransactionCase):
def setUp(self):
super(TestOAuthProviderClient, self).setUp()
self.client_vals = {
'name': 'Client',
'identifier': 'client',
}
def new_client(self, vals=None):
values = self.client_vals.copy()
if vals is not None:
values.update(vals)
return self.env['oauth.provider.client'].create(values)
def test_grant_response_type_default(self):
""" Check the value of the grant_type and response_type fields """
# Default : Web Application
client = self.new_client({'identifier': 'default'})
self.assertEqual(client.grant_type, 'authorization_code')
self.assertEqual(client.response_type, 'code')
def test_grant_response_type_web_application(self):
""" Check the value of the grant_type and response_type fields """
# Web Application
client = self.new_client(vals={'application_type': 'web application'})
self.assertEqual(client.grant_type, 'authorization_code')
self.assertEqual(client.response_type, 'code')
def test_grant_response_type_mobile_application(self):
""" Check the value of the grant_type and response_type fields """
# Mobile Application
client = self.new_client(
vals={'application_type': 'mobile application'})
self.assertEqual(client.grant_type, 'implicit')
self.assertEqual(client.response_type, 'token')
def test_grant_response_type_legacy_application(self):
""" Check the value of the grant_type and response_type fields """
# Legacy Application
client = self.new_client(
vals={'application_type': 'legacy application'})
self.assertEqual(client.grant_type, 'password')
self.assertEqual(client.response_type, 'none')
def test_grant_response_type_backend_application(self):
""" Check the value of the grant_type and response_type fields """
# Backend Application
client = self.new_client(
vals={'application_type': 'backend application'})
self.assertEqual(client.grant_type, 'client_credentials')
self.assertEqual(client.response_type, 'none')
def test_get_oauth2_server_default(self):
""" Check the returned server, depending on the application type """
# Default : Web Application
client = self.new_client({'identifier': 'default'})
self.assertTrue(
isinstance(client.get_oauth2_server(),
oauth2.WebApplicationServer))
def test_get_oauth2_server_web_application(self):
""" Check the returned server, depending on the application type """
# Web Application
client = self.new_client(vals={'application_type': 'web application'})
self.assertTrue(
isinstance(client.get_oauth2_server(),
oauth2.WebApplicationServer))
def test_get_oauth2_server_mobile_application(self):
""" Check the returned server, depending on the application type """
# Mobile Application
client = self.new_client(
vals={'application_type': 'mobile application'})
self.assertTrue(
isinstance(client.get_oauth2_server(),
oauth2.MobileApplicationServer))
def test_get_oauth2_server_legacy_applicaton(self):
""" Check the returned server, depending on the application type """
# Legacy Application
client = self.new_client(
vals={'application_type': 'legacy application'})
self.assertTrue(
isinstance(client.get_oauth2_server(),
oauth2.LegacyApplicationServer))
def test_get_oauth2_server_backend_application(self):
""" Check the returned server, depending on the application type """
# Backend Application
client = self.new_client(
vals={'application_type': 'backend application'})
self.assertTrue(
isinstance(client.get_oauth2_server(),
oauth2.BackendApplicationServer))
def test_get_oauth2_server_validator(self):
""" Check the validator of the returned server """
client = self.new_client()
# No defined validator: Check that an OdooValidator instance is created
self.assertTrue(
isinstance(client.get_oauth2_server().request_validator,
OdooValidator))
def test_get_oauth2_server_validator_custom(self):
""" Check the validator of the returned server """
client = self.new_client()
# Passed validator : Check that the validator instance is used
validator = OdooValidator()
self.assertEqual(
client.get_oauth2_server(validator).request_validator, validator)

374
oauth_provider/tests/test_oauth_provider_controller_legacy_application.py

@ -0,0 +1,374 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import base64
import json
import logging
from .common_test_controller import OAuthProviderControllerTransactionCase
from .common_test_oauth_provider_controller import \
TestOAuthProviderRefreshTokenController, \
TestOAuthProviderTokeninfoController, \
TestOAuthProviderUserinfoController, \
TestOAuthProviderOtherinfoController, \
TestOAuthProviderRevokeTokenController
_logger = logging.getLogger(__name__)
class TestOAuthProviderController(
OAuthProviderControllerTransactionCase,
TestOAuthProviderRefreshTokenController,
TestOAuthProviderTokeninfoController,
TestOAuthProviderUserinfoController,
TestOAuthProviderOtherinfoController,
TestOAuthProviderRevokeTokenController):
def setUp(self):
super(TestOAuthProviderController, self).setUp('legacy application')
def test_token_error_missing_arguments(self):
""" Check /oauth2/token without any argument
Must return an unsupported_grant_type error
"""
response = self.post_request('/oauth2/token')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_grant_type(self):
""" Check /oauth2/token with an invalid grant type
Must return an unsupported_grant_type error
"""
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'grant_type': 'Wrong grant type',
'username': 'Wrong username',
'password': 'Wrong password',
})
self.assertEqual(response.status_code, 400)
self.assertEqual(
json.loads(response.data), {'error': 'unsupported_grant_type'})
def test_token_error_missing_username(self):
""" Check /oauth2/token without username
Must return an invalid_request error
"""
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'grant_type': self.client.grant_type,
})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_request',
'error_description': 'Request is missing username parameter.',
})
def test_token_error_missing_password(self):
""" Check /oauth2/token without password
Must return an invalid_request error
"""
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'grant_type': self.client.grant_type,
'username': 'Wrong username',
})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_request',
'error_description': 'Request is missing password parameter.',
})
def test_token_error_missing_client_id(self):
""" Check /oauth2/token without client
Must return an unauthorized_client error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_client_identifier(self):
""" Check /oauth2/token with a wrong client identifier
Must return an invalid_client_id error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': 'Wrong client identifier',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_username(self):
""" Check /oauth2/token with a wrong username
Must return an invalid_grant error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': self.client.identifier,
'username': 'Wrong username',
'password': 'demo',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_grant',
'error_description': 'Invalid credentials given.',
})
def test_token_error_wrong_password(self):
""" Check /oauth2/token with a wrong password
Must return an invalid_grant error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': self.client.identifier,
'username': self.user.login,
'password': 'Wrong password',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_grant',
'error_description': 'Invalid credentials given.',
})
def test_token_error_wrong_client_id(self):
""" Check /oauth2/token with a wrong client id
Must return an invalid_client_id error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': 'Wrong client id',
'scope': self.client.scope_ids[0].code,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_missing_refresh_token(self):
""" Check /oauth2/token in refresh token mode without refresh token
Must return an invalid_request error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': 'refresh_token',
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_request',
'error_description': 'Missing refresh token parameter.',
})
def test_token_error_invalid_refresh_token(self):
""" Check /oauth2/token in refresh token mode with an invalid refresh token
Must return an invalid_grant error
"""
response = self.post_request('/oauth2/token', data={
'grant_type': 'refresh_token',
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'refresh_token': 'Wrong refresh token',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'})
def test_token_with_missing_secret(self):
""" Check client authentication without the secret provided
Must return an invalid_client error
"""
# Define a secret for the client
self.client.secret = 'OAuth Client secret'
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client'})
def test_token_with_unexpected_secret(self):
""" Check client authentication with an unexpected secret provided
Must return an invalid_client error
"""
# Don't define a secret for the client
auth_string = base64.b64encode(
'{client.identifier}:secret'.format(client=self.client))
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
}, headers=[(
'Authorization',
'Basic {auth_string}'.format(auth_string=auth_string)),
])
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client'})
def test_token_with_wrong_secret(self):
""" Check client authentication with a wrong secret
Must return an invalid_client error
"""
# Define a secret for the client
self.client.secret = 'OAuth Client secret'
auth_string = base64.b64encode(
'{client.identifier}:secret'.format(client=self.client))
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
}, headers=[(
'Authorization',
'Basic {auth_string}'.format(auth_string=auth_string)),
])
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client'})
def test_token_with_secret(self):
""" Check client authentication from Authorization header """
# Define a secret for the client
self.client.secret = 'OAuth Client secret'
auth_string = base64.b64encode(
'{client.identifier}:{client.secret}'.format(client=self.client))
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
}, headers=[(
'Authorization',
'Basic {auth_string}'.format(auth_string=auth_string)),
])
response_data = json.loads(response.data)
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
self.assertEqual(response.status_code, 200)
self.assertEqual(token.token, response_data['access_token'])
self.assertEqual(token.token_type, response_data['token_type'])
self.assertEqual(token.refresh_token, response_data['refresh_token'])
self.assertEqual(token.scope_ids, self.client.scope_ids[0])
self.assertEqual(token.user_id, self.user)
def test_token_with_wrong_non_basic_authorization(self):
""" Check client authentication with a non-Basic Authorization header
Must generate a token : Non basic authorization headers are ignored
"""
# Don't define a secret for the client
auth_string = base64.b64encode(
'{client.identifier}:secret'.format(client=self.client))
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
}, headers=[(
'Authorization',
'Digest {auth_string}'.format(auth_string=auth_string)),
])
response_data = json.loads(response.data)
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
self.assertEqual(response.status_code, 200)
self.assertEqual(token.token, response_data['access_token'])
self.assertEqual(token.token_type, response_data['token_type'])
self.assertEqual(token.refresh_token, response_data['refresh_token'])
self.assertEqual(token.scope_ids, self.client.scope_ids[0])
self.assertEqual(token.user_id, self.user)
def test_token_with_right_non_basic_authorization(self):
""" Check client authentication with a non-Basic Authorization header
Must return an invalid_client error : Non basic authorization headers
are ignored
"""
# Define a secret for the client
self.client.secret = 'OAuth Client secret'
auth_string = base64.b64encode(
'{client.identifier}:{client.secret}'.format(client=self.client))
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
}, headers=[(
'Authorization',
'Digest {auth_string}'.format(auth_string=auth_string)),
])
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client'})
def test_successful_token_retrieval(self):
""" Check the full process for a LegacyApplication
GET, then POST, token and informations retrieval
"""
# Ask a token to the server
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
})
response_data = json.loads(response.data)
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
self.assertEqual(response.status_code, 200)
self.assertEqual(token.token, response_data['access_token'])
self.assertEqual(token.token_type, response_data['token_type'])
self.assertEqual(token.refresh_token, response_data['refresh_token'])
self.assertEqual(token.scope_ids, self.client.scope_ids[0])
self.assertEqual(token.user_id, self.user)

113
oauth_provider/tests/test_oauth_provider_controller_mobile_application.py

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
from .common_test_controller import OAuthProviderControllerTransactionCase
from .common_test_oauth_provider_controller import \
TestOAuthProviderAurhorizeController, \
TestOAuthProviderTokeninfoController, \
TestOAuthProviderUserinfoController, \
TestOAuthProviderOtherinfoController, \
TestOAuthProviderRevokeTokenController
_logger = logging.getLogger(__name__)
try:
import oauthlib
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class TestOAuthProviderController(
OAuthProviderControllerTransactionCase,
TestOAuthProviderAurhorizeController,
TestOAuthProviderTokeninfoController,
TestOAuthProviderUserinfoController,
TestOAuthProviderOtherinfoController,
TestOAuthProviderRevokeTokenController):
def setUp(self):
super(TestOAuthProviderController, self).setUp('mobile application')
def test_authorize_skip_authorization(self):
""" Call /oauth2/authorize while skipping the authorization page """
# Configure the client to skip the authorization page
self.client.skip_authorization = True
# Login as demo user
self.login(self.user.login, self.user.login)
# Call the authorize method with good values
state = 'Some custom state'
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'state': state,
})
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# The response should be a redirect to the redirect URI, with the
# authorization_code added as GET parameter
self.assertEqual(response.status_code, 302)
query_string = oauthlib.common.urlencode({
'state': state,
'access_token': token.token,
'token_type': token.token_type,
'expires_in': 3600,
'scope': token.scope_ids.code,
}.items())
self.assertEqual(
response.headers['Location'], '{uri_base}#{query_string}'.format(
uri_base=self.redirect_uri_base, query_string=query_string))
self.assertEqual(token.user_id, self.user)
def test_successful_token_retrieval(self):
""" Check the full process for a MobileApplication
GET, then POST, token and informations retrieval
"""
# Call the authorize method with good values to fill the session scopes
# and credentials variables
state = 'Some custom state'
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'state': state,
})
self.assertEqual(response.status_code, 200)
self.assertTrue(self.client.name in response.data)
self.assertTrue(self.client.scope_ids[0].name in response.data)
self.assertTrue(self.client.scope_ids[0].description in response.data)
# Then, call the POST route to validate the authorization
response = self.post_request('/oauth2/authorize')
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# The response should be a redirect to the redirect URI, with the
# token added as GET parameter
self.assertEqual(response.status_code, 302)
query_string = oauthlib.common.urlencode({
'state': state,
'access_token': token.token,
'token_type': token.token_type,
'expires_in': 3600,
'scope': token.scope_ids.code,
}.items())
self.assertEqual(
response.headers['Location'], '{uri_base}#{query_string}'.format(
uri_base=self.redirect_uri_base, query_string=query_string))
self.assertEqual(token.user_id, self.user)

358
oauth_provider/tests/test_oauth_provider_controller_web_application.py

@ -0,0 +1,358 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import json
import logging
from .common_test_controller import OAuthProviderControllerTransactionCase
from .common_test_oauth_provider_controller import \
TestOAuthProviderRefreshTokenController, \
TestOAuthProviderAurhorizeController, \
TestOAuthProviderTokeninfoController, \
TestOAuthProviderUserinfoController, \
TestOAuthProviderOtherinfoController, \
TestOAuthProviderRevokeTokenController
_logger = logging.getLogger(__name__)
try:
import oauthlib
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
class TestOAuthProviderController(
OAuthProviderControllerTransactionCase,
TestOAuthProviderRefreshTokenController,
TestOAuthProviderAurhorizeController,
TestOAuthProviderTokeninfoController,
TestOAuthProviderUserinfoController,
TestOAuthProviderOtherinfoController,
TestOAuthProviderRevokeTokenController):
def setUp(self):
super(TestOAuthProviderController, self).setUp('web application')
def new_code(self):
# Configure the client to skip the authorization page
self.client.skip_authorization = True
# Call the authorize method with good values
state = 'Some custom state'
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'state': state,
})
# A new authorization code should have been generated
# We can safely pick the latest generated code here, because no other
# code could have been generated during the test
code = self.env['oauth.provider.authorization.code'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# The response should be a redirect to the redirect URI, with the
# authorization_code added as GET parameter
self.assertEqual(response.status_code, 302)
query_string = oauthlib.common.urlencode(
{'state': state, 'code': code.code}.items())
self.assertEqual(
response.headers['Location'], '{uri_base}?{query_string}'.format(
uri_base=self.redirect_uri_base, query_string=query_string))
self.assertEqual(code.user_id, self.user)
self.logout()
return code
def test_token_error_missing_session(self):
""" Check /oauth2/token without any session set
Must return an invalid_client_id error
"""
response = self.post_request('/oauth2/token')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_missing_arguments(self):
""" Check /oauth2/token without any argument
Must return an invalid_client_id error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token')
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_grant_type(self):
""" Check /oauth2/token with an invalid grant type
Must return an invalid_client_id error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': 'Wrong grant type',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_missing_code(self):
""" Check /oauth2/token without code
Must return an invalid_client_id error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_missing_client_id(self):
""" Check /oauth2/token without client
Must return an invalid_client_id error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'code': 'Wrong code',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_client_identifier(self):
""" Check /oauth2/token with a wrong client identifier
Must return an invalid_client_id error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': 'Wrong client identifier',
'code': 'Wrong code',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_wrong_code(self):
""" Check /oauth2/token with a wrong code
Must return an invalid_grant error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': self.client.identifier,
'code': 'Wrong code',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'})
def test_token_error_missing_redirect_uri(self):
""" Check /oauth2/token without redirect_uri
Must return an access_denied error
"""
# Generate an authorization code
code = self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': self.client.identifier,
'code': code.code,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'access_denied'})
def test_token_error_wrong_redirect_uri(self):
""" Check /oauth2/token with a wrong redirect_uri
Must return an access_denied error
"""
# Generate an authorization code
code = self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': self.client.identifier,
'code': code.code,
'redirect_uri': 'Wrong redirect URI',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'access_denied'})
def test_token_error_wrong_client_id(self):
""" Check /oauth2/token with a wrong client id
Must return an invalid_client_id error
"""
# Generate an authorization code
code = self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': self.client.grant_type,
'client_id': 'Wrong client id',
'code': code.code,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
})
self.assertEqual(response.status_code, 401)
self.assertEqual(
json.loads(response.data), {'error': 'invalid_client_id'})
def test_token_error_missing_refresh_token(self):
""" Check /oauth2/token in refresh token mode without refresh token
Must return an invalid_request error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': 'refresh_token',
'client_id': self.client.identifier,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
})
self.assertEqual(response.status_code, 400)
self.assertEqual(json.loads(response.data), {
'error': 'invalid_request',
'error_description': 'Missing refresh token parameter.',
})
def test_token_error_invalid_refresh_token(self):
""" Check /oauth2/token in refresh token mode with an invalid refresh token
Must return an invalid_grant error
"""
# Generate an authorization code to set the session
self.new_code()
response = self.post_request('/oauth2/token', data={
'grant_type': 'refresh_token',
'client_id': self.client.identifier,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'refresh_token': 'Wrong refresh token',
})
self.assertEqual(response.status_code, 401)
self.assertEqual(json.loads(response.data), {'error': 'invalid_grant'})
def test_authorize_skip_authorization(self):
""" Call /oauth2/authorize while skipping the authorization page """
# Configure the client to skip the authorization page
self.client.skip_authorization = True
# Call the authorize method with good values
state = 'Some custom state'
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'state': state,
})
# A new authorization code should have been generated
# We can safely pick the latest generated code here, because no other
# code could have been generated during the test
code = self.env['oauth.provider.authorization.code'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# The response should be a redirect to the redirect URI, with the
# authorization_code added as GET parameter
self.assertEqual(response.status_code, 302)
query_string = oauthlib.common.urlencode({
'state': state,
'code': code.code,
}.items())
self.assertEqual(
response.headers['Location'], '{uri_base}?{query_string}'.format(
uri_base=self.redirect_uri_base, query_string=query_string))
self.assertEqual(code.user_id, self.user)
def test_successful_token_retrieval(self):
""" Check the full process for a WebApplication
GET, then POST, token and informations retrieval
"""
# Call the authorize method with good values to fill the session scopes
# and credentials variables
state = 'Some custom state'
self.login('demo', 'demo')
response = self.get_request('/oauth2/authorize', data={
'client_id': self.client.identifier,
'response_type': self.client.response_type,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'state': state,
})
self.assertEqual(response.status_code, 200)
self.assertTrue(self.client.name in response.data)
self.assertTrue(self.client.scope_ids[0].name in response.data)
self.assertTrue(self.client.scope_ids[0].description in response.data)
# Then, call the POST route to validate the authorization
response = self.post_request('/oauth2/authorize')
# A new authorization code should have been generated
# We can safely pick the latest generated code here, because no other
# code could have been generated during the test
code = self.env['oauth.provider.authorization.code'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# The response should be a redirect to the redirect URI, with the
# authorization_code added as GET parameter
self.assertEqual(response.status_code, 302)
query_string = oauthlib.common.urlencode({
'state': state,
'code': code.code,
}.items())
self.assertEqual(
response.headers['Location'], '{uri_base}?{query_string}'.format(
uri_base=self.redirect_uri_base, query_string=query_string))
self.assertEqual(code.user_id, self.user)
self.logout()
# Now that the user vaidated the authorization, we can ask for a token,
# using the returned code
response = self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'redirect_uri': self.redirect_uri_base,
'scope': self.client.scope_ids[0].code,
'code': code.code,
'grant_type': self.client.grant_type,
})
response_data = json.loads(response.data)
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
self.assertEqual(response.status_code, 200)
self.assertEqual(token.token, response_data['access_token'])
self.assertEqual(token.token_type, response_data['token_type'])
self.assertEqual(token.refresh_token, response_data['refresh_token'])
self.assertEqual(token.scope_ids, code.scope_ids)
self.assertEqual(token.user_id, self.user)

272
oauth_provider/tests/test_oauth_provider_scope.py

@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from openerp.tests.common import TransactionCase
class TestOAuthProviderScope(TransactionCase):
def setUp(self):
super(TestOAuthProviderScope, self).setUp()
self.filter = self.env['ir.filters'].create({
'name': 'Current user',
'model_id': 'res.users',
'domain': "[('id', '=', uid)]",
})
self.scope_vals = {
'name': 'Scope',
'code': 'scope',
'description': 'Description of the scope',
'model_id': self.env.ref('base.model_res_users').id,
'filter_id': self.filter.id,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_users_email').id]),
],
}
def new_scope(self, vals=None):
values = self.scope_vals
if vals is not None:
values.update(vals)
return self.env['oauth.provider.scope'].create(values)
def test_get_data_from_model_without_filter(self):
""" Check the values returned by the get_data_for_model method when no
filter is defined
"""
scope = self.new_scope({'filter_id': False})
# Check a simple call with the right model
data = scope.get_data_for_model('res.users')
# Check that we have multiple users (otherwise this test is useless)
self.assertTrue(len(self.env['res.users'].search([]).ids) > 1)
self.assertEqual(
set(data.keys()), set(self.env['res.users'].search([]).ids))
def test_get_data_from_model_without_filter_wrong_model(self):
""" Check the values returned by the get_data_for_model method when no
filter is defined
"""
scope = self.new_scope({'filter_id': False})
# Check a simple call with a wrong model
data = scope.get_data_for_model('res.partner')
self.assertEqual(data, {})
def test_get_data_from_model_with_filter(self):
""" Check the values returned by the get_data_for_model method when no
res_id is supplied
"""
scope = self.new_scope()
# Check a simple call with the right model
data = scope.get_data_for_model('res.users')
self.assertEqual(data, {
self.env.user.id: {
'id': self.env.user.id,
'email': self.env.user.email,
},
})
def test_get_data_from_model_with_filter_wrong_model(self):
""" Check the values returned by the get_data_for_model method when no
res_id is supplied
"""
scope = self.new_scope()
# Check a simple call with a wrong model
data = scope.get_data_for_model('res.partner')
self.assertEqual(data, {})
def test_get_data_from_model_with_res_id_and_no_filter(self):
""" Check the values returned by the get_data_for_model method when a
res_id is supplied
"""
scope = self.new_scope({'filter_id': False})
# Check a simple call with the right model
data = scope.get_data_for_model('res.users', res_id=self.env.user.id)
self.assertEqual(data, {
'id': self.env.user.id,
'email': self.env.user.email,
})
def test_get_data_from_model_with_res_id_and_no_filter_wrong_model(self):
""" Check the values returned by the get_data_for_model method when a
res_id is supplied
"""
scope = self.new_scope({'filter_id': False})
# Check a simple call with a wrong model
data = scope.get_data_for_model(
'res.partner', res_id=self.env.user.id + 1)
self.assertEqual(data, {})
def test_get_data_from_model_with_res_id(self):
""" Check the values returned by the get_data_for_model method when a
res_id is supplied
"""
scope = self.new_scope()
# Check a simple call with the right model
data = scope.get_data_for_model('res.users', res_id=self.env.user.id)
self.assertEqual(data, {
'id': self.env.user.id,
'email': self.env.user.email,
})
def test_get_data_from_model_with_res_id_wrong_model(self):
""" Check the values returned by the get_data_for_model method when a
res_id is supplied
"""
scope = self.new_scope()
# Check a simple call with a wrong model
data = scope.get_data_for_model(
'res.partner', res_id=self.env.user.id + 1)
self.assertEqual(data, {})
def _generate_multiple_scopes(self):
scopes = self.new_scope()
scopes += self.new_scope({
'code': 'Profile',
'field_ids': [(6, 0, [
self.env.ref('base.field_res_users_name').id,
self.env.ref('base.field_res_users_city').id,
self.env.ref('base.field_res_users_country_id').id,
])],
})
scopes += self.new_scope({
'model_id': self.env.ref('base.model_res_groups').id,
'code': 'All groups',
'filter_id': False,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
],
})
return scopes
def test_get_data_from_model_with_multiple_scopes_empty_fields(self):
""" Check the values returned by the get_data_for_model method when
calling on multiple scopes
"""
scopes = self._generate_multiple_scopes()
# Check a simple call with the right model with empty fields
self.env.user.city = False
self.env.user.country_id = False
data = scopes.get_data_for_model('res.users')
self.assertEqual(data, {self.env.user.id: {
'id': 1,
'email': self.env.user.email,
'name': self.env.user.name,
'city': False,
'country_id': False,
}})
def test_get_data_from_model_with_multiple_scopesfirst_model(self):
""" Check the values returned by the get_data_for_model method when
calling on multiple scopes
"""
scopes = self._generate_multiple_scopes()
# Check a simple call with the right model without empty fields
country = self.env.ref('base.fr')
self.env.user.city = 'Paris'
self.env.user.country_id = country
data = scopes.get_data_for_model('res.users')
self.assertEqual(data, {self.env.user.id: {
'id': 1,
'email': self.env.user.email,
'name': self.env.user.name,
'city': self.env.user.city,
'country_id': country.name,
}})
def test_get_data_from_model_with_multiple_scopes_second_model(self):
""" Check the values returned by the get_data_for_model method when
calling on multiple scopes
"""
scopes = self._generate_multiple_scopes()
# Check a simple call with another right model
data = scopes.get_data_for_model('res.groups')
self.assertEqual(
set(data.keys()), set(self.env['res.groups'].search([]).ids))
def test_get_data_from_model_with_multiple_scopes_wrong_model(self):
""" Check the values returned by the get_data_for_model method when
calling on multiple scopes
"""
scopes = self._generate_multiple_scopes()
# Check a simple call with a wrong model
data = scopes.get_data_for_model('res.partner')
self.assertEqual(data, {})
def _generate_multiple_scopes_match(self):
scopes = self.new_scope()
scopes += self.new_scope({
'code': 'All users',
'filter_id': False,
})
scopes += self.new_scope({
'model_id': self.env.ref('base.model_res_groups').id,
'code': 'All groups',
'filter_id': False,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_groups_name').id]),
],
})
return scopes
def test_get_data_from_model_with_all_scopes_match(self):
""" Check the values returned by the get_data_for_model method when all
scopes are required to match
"""
scopes = self._generate_multiple_scopes_match()
# Check a simple call with the right model with any scope match
# returned records
data = scopes.get_data_for_model('res.users')
self.assertEqual(
set(data.keys()), set(self.env['res.users'].search([]).ids))
def test_get_data_from_model_with_all_scopes_match_first_model(self):
""" Check the values returned by the get_data_for_model method when all
scopes are required to match
"""
scopes = self._generate_multiple_scopes_match()
# Check a simple call with the right model with all scopes required to
# match returned records
data = scopes.get_data_for_model('res.users', all_scopes_match=True)
self.assertEqual(data, {self.env.user.id: {
'id': 1,
'email': self.env.user.email,
}})
def test_get_data_from_model_with_all_scopes_match_second_model(self):
""" Check the values returned by the get_data_for_model method when all
scopes are required to match
"""
scopes = self._generate_multiple_scopes_match()
# Check a simple call with another right model
data = scopes.get_data_for_model('res.groups')
self.assertEqual(
set(data.keys()), set(self.env['res.groups'].search([]).ids))
def test_get_data_from_model_with_all_scopes_match_wrong_model(self):
""" Check the values returned by the get_data_for_model method when all
scopes are required to match
"""
scopes = self._generate_multiple_scopes_match()
# Check a simple call with a wrong model
data = scopes.get_data_for_model('res.partner')
self.assertEqual(data, {})

270
oauth_provider/tests/test_oauth_provider_token.py

@ -0,0 +1,270 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from datetime import datetime, timedelta
from openerp import fields, exceptions
from openerp.tests.common import TransactionCase
class TestOAuthProviderToken(TransactionCase):
def setUp(self):
super(TestOAuthProviderToken, self).setUp()
self.client = self.env['oauth.provider.client'].create({
'name': 'Client',
'identifier': 'client',
})
self.token_vals = {
'user_id': self.env.user.id,
'client_id': self.client.id,
'token': 'token',
'expires_at': fields.Datetime.now(),
}
self.filter = self.env['ir.filters'].create({
'name': 'Current user',
'model_id': 'res.users',
'domain': "[('id', '=', uid)]",
})
self.scope_vals = {
'name': 'Scope',
'code': 'scope',
'description': 'Description of the scope',
'model_id': self.env.ref('base.model_res_users').id,
'filter_id': self.filter.id,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_users_email').id]),
],
}
def new_token(self, vals=None):
values = self.token_vals
if vals is not None:
values.update(vals)
return self.env['oauth.provider.token'].create(values)
def new_scope(self, vals=None):
values = self.scope_vals
if vals is not None:
values.update(vals)
return self.env['oauth.provider.scope'].create(values)
def test_inactive(self):
""" Check the value of the active field, for an expired token """
not_expired = datetime.now() + timedelta(days=1)
token = self.new_token(vals={
'token': 'Active',
'expires_at': fields.Datetime.to_string(not_expired),
})
self.assertEqual(token.active, True)
def test_active(self):
""" Check the value of the active field, for a not expired token """
expired = datetime.now() - timedelta(minutes=1)
token = self.new_token(vals={
'token': 'Not active',
'expires_at': fields.Datetime.to_string(expired),
})
self.assertEqual(token.active, False)
def _generate_tokens_for_active_search(self):
not_expired = datetime.now() + timedelta(days=1)
not_expired_tokens = self.new_token(vals={
'token': 'Not expired',
'expires_at': fields.Datetime.to_string(not_expired),
})
not_expired_tokens += not_expired_tokens.copy(default={
'token': 'Other not expired',
})
expired = datetime.now() - timedelta(minutes=1)
expired_tokens = self.new_token(vals={
'token': 'Expired',
'expires_at': fields.Datetime.to_string(expired),
})
expired_tokens += expired_tokens.copy(default={
'token': 'Other expired',
})
return expired_tokens, not_expired_tokens
def test_search_empty_domain(self):
""" Check the results of searching tokens with an empty domain
Only active tokens should be returned
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([]), not_expired_tokens)
def test_active_search_equal_true(self):
""" Check the results of searching tokens with explicit active = True domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', '=', True),
]), not_expired_tokens)
def test_active_search_equal_false(self):
""" Check the results of searching tokens with active = False domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', '=', False),
]), expired_tokens)
def test_active_search_not_equal_true(self):
""" Check the results of searching tokens with active != True domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', '!=', True),
]), expired_tokens)
def test_active_search_not_equal_false(self):
""" Check the results of searching tokens with active != False domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', '!=', False),
]), not_expired_tokens)
def test_active_search_in_true(self):
""" Check the results of searching tokens with active in (True,) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'in', (True,)),
]), not_expired_tokens)
def test_active_search_in_false(self):
""" Check the results of searching tokens with active in (False,) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'in', (False,)),
]), expired_tokens)
def test_active_search_not_in_true(self):
""" Check the results of searching tokens with active not in (True,) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'not in', (True,)),
]), expired_tokens)
def test_active_search_not_in_false(self):
""" Check the results of searching tokens with active not in (False,) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'not in', (False,)),
]), not_expired_tokens)
def test_active_search_in_true_false(self):
""" Check the results of searching tokens with active in (True, False) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'in', (True, False)),
]), not_expired_tokens + expired_tokens)
def test_active_search_not_in_true_false(self):
""" Check the results of searching tokens with active notin (True,False) domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
self.assertEqual(token_obj.search([
('active', 'not in', (True, False)),
]), token_obj)
def test_active_search_invalid_operator(self):
""" Check the results of searching tokens with an invalid operatr in domain
"""
token_obj = self.env['oauth.provider.token']
expired_tokens, not_expired_tokens = \
self._generate_tokens_for_active_search()
with self.assertRaises(exceptions.UserError):
token_obj.search([('active', '>', True)])
def test_get_data_from_model_with_at_least_one_scope_matching(self):
""" Check the values returned by the get_data_for_model method with
at least one scope matching the data
"""
scopes = self.new_scope()
scopes += self.new_scope({
'code': 'All users',
'filter_id': False,
})
token = self.new_token(vals={
'scope_ids': [(6, 0, scopes.ids)],
})
# Check a simple call with the right model with empty fields
data = token.get_data_for_model('res.users')
self.assertEqual(
sorted(data.keys()), sorted(self.env['res.users'].search([]).ids))
def test_get_data_from_model_with_all_scopes_matching(self):
""" Check the values returned by the get_data_for_model method with
all scopes required to match the data
"""
scopes = self.new_scope()
scopes += self.new_scope({
'code': 'All users',
'filter_id': False,
})
token = self.new_token(vals={
'scope_ids': [(6, 0, scopes.ids)],
})
# Check a simple call with the right model without empty fields
data = token.get_data_for_model('res.users', all_scopes_match=True)
self.assertEqual(data, {self.env.user.id: {
'id': 1,
'email': self.env.user.email,
}})
def test_get_data_from_model_with_no_scope_matching(self):
""" Check the values returned by the get_data_for_model method with
an unauthorized model
"""
token = self.new_token()
# Check a simple call with a wrong model
data = token.get_data_for_model('res.partner')
self.assertEqual(data, {})

89
oauth_provider/views/oauth_provider_client.xml

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 SYLEAM
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
-->
<odoo>
<record id="view_oauth_provider_client_tree" model="ir.ui.view">
<field name="name">oauth.provider.client.tree</field>
<field name="model">oauth.provider.client</field>
<field name="arch" type="xml">
<tree string="OAuth Provider Clients">
<field name="name"/>
<field name="identifier"/>
<field name="application_type"/>
<field name="scope_ids"/>
</tree>
</field>
</record>
<record id="view_oauth_provider_client_form" model="ir.ui.view">
<field name="name">oauth.provider.client.form</field>
<field name="model">oauth.provider.client</field>
<field name="arch" type="xml">
<form string="OAuth Provider Clients">
<sheet>
<h1>
<field name="name"/>
</h1>
<group>
<group>
<field name="identifier"/>
<field name="secret"/>
<field name="application_type"/>
</group>
<group>
<field name="skip_authorization" attrs="{'invisible': [('application_type', 'not in', ('mobile application', 'web application'))]}"/>
<field name="scope_ids" widget="many2many_tags"/>
<field name="token_type"/>
</group>
</group>
<notebook colspan="4">
<page string="Allowed Redirect URIs" attrs="{'invisible': [('application_type', 'not in', ('mobile application', 'web application'))]}">
<field name="redirect_uri_ids">
<tree string="Redirect URIs" editable="bottom">
<field name="sequence" widget="handle"/>
<field name="name"/>
</tree>
</field>
</page>
</notebook>
</sheet>
</form>
</field>
</record>
<record id="view_oauth_provider_client_search" model="ir.ui.view">
<field name="name">oauth.provider.client.search</field>
<field name="model">oauth.provider.client</field>
<field name="arch" type="xml">
<search string="OAuth Provider Clients">
<field name="name"/>
<field name="identifier"/>
<field name="application_type"/>
<field name="scope_ids"/>
</search>
</field>
</record>
<record model="ir.actions.act_window" id="act_open_oauth_provider_client_view">
<field name="name">OAuth Provider Clients</field>
<field name="type">ir.actions.act_window</field>
<field name="res_model">oauth.provider.client</field>
<field name="view_type">form</field>
<field name="view_mode">tree,form</field>
<field name="search_view_id" ref="view_oauth_provider_client_search"/>
<field name="domain">[]</field>
<field name="context">{}</field>
</record>
<record model="ir.actions.act_window.view" id="act_open_oauth_provider_client_view_form">
<field name="act_window_id" ref="act_open_oauth_provider_client_view"/>
<field name="sequence" eval="20"/>
<field name="view_mode">form</field>
<field name="view_id" ref="view_oauth_provider_client_form"/>
</record>
<record model="ir.actions.act_window.view" id="act_open_oauth_provider_client_view_tree">
<field name="act_window_id" ref="act_open_oauth_provider_client_view"/>
<field name="sequence" eval="10"/>
<field name="view_mode">tree</field>
<field name="view_id" ref="view_oauth_provider_client_tree"/>
</record>
<menuitem id="menu_oauth_provider_client" parent="base.menu_users" sequence="40" action="act_open_oauth_provider_client_view"/>
</odoo>

87
oauth_provider/views/oauth_provider_scope.xml

@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 SYLEAM
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
-->
<odoo>
<record id="view_oauth_provider_scope_tree" model="ir.ui.view">
<field name="name">oauth.provider.scope.tree</field>
<field name="model">oauth.provider.scope</field>
<field name="arch" type="xml">
<tree string="OAuth Provider Scopes">
<field name="name"/>
<field name="code"/>
<field name="description"/>
<field name="model_id"/>
</tree>
</field>
</record>
<record id="view_oauth_provider_scope_form" model="ir.ui.view">
<field name="name">oauth.provider.scope.form</field>
<field name="model">oauth.provider.scope</field>
<field name="arch" type="xml">
<form string="OAuth Provider Scopes">
<sheet>
<h1>
<field name="name"/>
</h1>
<group>
<group>
<field name="code"/>
<field name="description"/>
<separator string="Filter settings" colspan="2"/>
<field name="model_id"/>
<field name="filter_id"/>
<field name="model" invisible="1"/>
</group>
<group>
<separator string="Fields" colspan="2"/>
<field name="field_ids" nolabel="1">
<tree string="Fields">
<field name="name"/>
<field name="field_description"/>
<field name="ttype"/>
</tree>
</field>
</group>
</group>
</sheet>
</form>
</field>
</record>
<record id="view_oauth_provider_scope_search" model="ir.ui.view">
<field name="name">oauth.provider.scope.search</field>
<field name="model">oauth.provider.scope</field>
<field name="arch" type="xml">
<search string="OAuth Provider Scopes">
<field name="name"/>
<field name="code"/>
<field name="description"/>
<field name="model_id"/>
</search>
</field>
</record>
<record model="ir.actions.act_window" id="act_open_oauth_provider_scope_view">
<field name="name">OAuth Provider Scopes</field>
<field name="type">ir.actions.act_window</field>
<field name="res_model">oauth.provider.scope</field>
<field name="view_type">form</field>
<field name="view_mode">tree,form</field>
<field name="search_view_id" ref="view_oauth_provider_scope_search"/>
<field name="domain">[]</field>
<field name="context">{}</field>
</record>
<record model="ir.actions.act_window.view" id="act_open_oauth_provider_scope_view_form">
<field name="act_window_id" ref="act_open_oauth_provider_scope_view"/>
<field name="sequence" eval="20"/>
<field name="view_mode">form</field>
<field name="view_id" ref="view_oauth_provider_scope_form"/>
</record>
<record model="ir.actions.act_window.view" id="act_open_oauth_provider_scope_view_tree">
<field name="act_window_id" ref="act_open_oauth_provider_scope_view"/>
<field name="sequence" eval="10"/>
<field name="view_mode">tree</field>
<field name="view_id" ref="view_oauth_provider_scope_tree"/>
</record>
<menuitem id="menu_oauth_provider_scope" parent="base.menu_users" sequence="40" action="act_open_oauth_provider_scope_view"/>
</odoo>

1
requirements.txt

@ -6,3 +6,4 @@ validate_email
pysftp pysftp
pyotp pyotp
fs==0.5.4 fs==0.5.4
oauthlib
Loading…
Cancel
Save