Browse Source

[ADD] Add oauth_provider_jwt module

pull/821/head
Sylvain GARANCHER 8 years ago
parent
commit
dc3205a6e0
  1. 79
      oauth_provider_jwt/README.rst
  2. 6
      oauth_provider_jwt/__init__.py
  3. 26
      oauth_provider_jwt/__openerp__.py
  4. 5
      oauth_provider_jwt/controllers/__init__.py
  5. 23
      oauth_provider_jwt/controllers/main.py
  6. 5
      oauth_provider_jwt/models/__init__.py
  7. 191
      oauth_provider_jwt/models/oauth_provider_client.py
  8. 5
      oauth_provider_jwt/tests/__init__.py
  9. 235
      oauth_provider_jwt/tests/test_oauth_provider_json_web_token.py
  10. 25
      oauth_provider_jwt/views/oauth_provider_client.xml
  11. 2
      requirements.txt

79
oauth_provider_jwt/README.rst

@ -0,0 +1,79 @@
.. image:: https://img.shields.io/badge/licence-AGPL--3-blue.svg
:target: http://www.gnu.org/licenses/agpl-3.0-standalone.html
:alt: License: AGPL-3
====================
OAuth Provider - JWT
====================
This module adds the JSON Web Token support to OAuth2 provider.
Installation
============
To install this module, you need to:
#. Install the pyjwt and cryptography python modules
#. Install the module like any other in Odoo
Configuration
=============
This module adds a new token type in the OAuth client configuration.
Once the *JSON Web Token* type is selected, a new tab appears at the bottom, where you'll have to select an algorithm for the token signature.
For asymetric algorithms, it is possible to put a custom private key, or the module can generate one for you.
The public key is automatically computed from the private one.
Usage
=====
There is no usage change from the base OAuth2 provider module.
The public key can be retrieved by clients using this URL: http://odoo.example.com/oauth2/public_key?client_id=identifier_of_the_oauth_client
.. image:: https://odoo-community.org/website/image/ir.attachment/5784_f2813bd/datas
:alt: Try me on Runbot
:target: https://runbot.odoo-community.org/runbot/149/9.0
Known issues / Roadmap
======================
* Add support for the client-side JWT request (https://tools.ietf.org/html/rfc7523)
Bug Tracker
===========
Bugs are tracked on `GitHub Issues
<https://github.com/OCA/server-tools/issues>`_. In case of trouble, please
check there if your issue has already been reported. If you spotted it first,
help us smashing it by providing a detailed and welcomed feedback.
Credits
=======
Images
------
* Odoo Community Association: `Icon <https://github.com/OCA/maintainer-tools/blob/master/template/module/static/description/icon.svg>`_.
Contributors
------------
* Sylvain Garancher <sylvain.garancher@syleam.fr>
Maintainer
----------
.. image:: https://odoo-community.org/logo.png
:alt: Odoo Community Association
:target: https://odoo-community.org
This module is maintained by the OCA.
OCA, or the Odoo Community Association, is a nonprofit organization whose
mission is to support the collaborative development of Odoo features and
promote its widespread use.
To contribute to this module, please visit https://odoo-community.org.

6
oauth_provider_jwt/__init__.py

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import controllers
from . import models

26
oauth_provider_jwt/__openerp__.py

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
{
'name': 'OAuth Provider - JWT',
'summary': 'Adds the JSON Web Token support for OAuth2 provider',
'version': '9.0.1.0.0',
'category': 'Authentication',
'website': 'http://www.syleam.fr/',
'author': 'SYLEAM, Odoo Community Association (OCA)',
'license': 'AGPL-3',
'installable': True,
'external_dependencies': {
'python': [
'jwt',
'cryptography',
],
},
'depends': [
'oauth_provider',
],
'data': [
'views/oauth_provider_client.xml',
],
}

5
oauth_provider_jwt/controllers/__init__.py

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import main

23
oauth_provider_jwt/controllers/main.py

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import werkzeug
from openerp import http
from openerp.addons import oauth_provider
from openerp.addons.web.controllers.main import ensure_db
class OAuth2ProviderController(
oauth_provider.controllers.main.OAuth2ProviderController):
@http.route(
'/oauth2/public_key', type='http', auth='none', methods=['GET'])
def public_key(self, client_id=None, *args, **kwargs):
""" Returns the public key of the requested client """
ensure_db()
client = http.request.env['oauth.provider.client'].sudo().search([
('identifier', '=', client_id),
])
return werkzeug.wrappers.BaseResponse(
client.jwt_public_key or '', status=200)

5
oauth_provider_jwt/models/__init__.py

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import oauth_provider_client

191
oauth_provider_jwt/models/oauth_provider_client.py

@ -0,0 +1,191 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
from datetime import datetime, timedelta
from openerp import models, api, fields, exceptions, _
_logger = logging.getLogger(__name__)
try:
from oauthlib.oauth2.rfc6749.tokens import random_token_generator
except ImportError:
_logger.debug('Cannot `import oauthlib`.')
try:
import jwt
except ImportError:
_logger.debug('Cannot `import jwt`.')
try:
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import \
EllipticCurvePrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import \
Encoding, PublicFormat, PrivateFormat, NoEncryption, \
load_pem_private_key
from cryptography.hazmat.primitives.asymmetric import rsa, ec
except ImportError:
_logger.debug('Cannot `import cryptography`.')
class OAuthProviderClient(models.Model):
_inherit = 'oauth.provider.client'
CRYPTOSYSTEMS = {
'ES': EllipticCurvePrivateKey,
'RS': RSAPrivateKey,
'PS': RSAPrivateKey,
}
token_type = fields.Selection(selection_add=[('jwt', 'JSON Web Token')])
jwt_scope_id = fields.Many2one(
comodel_name='oauth.provider.scope', string='Data Scope',
domain=[('model_id.model', '=', 'res.users')],
help='Scope executed to add some user\'s data in the token.')
jwt_algorithm = fields.Selection(selection=[
('HS256', 'HMAC using SHA-256 hash algorithm'),
('HS384', 'HMAC using SHA-384 hash algorithm'),
('HS512', 'HMAC using SHA-512 hash algorithm'),
('ES256', 'ECDSA signature algorithm using SHA-256 hash algorithm'),
('ES384', 'ECDSA signature algorithm using SHA-384 hash algorithm'),
('ES512', 'ECDSA signature algorithm using SHA-512 hash algorithm'),
('RS256', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-256 hash '
'algorithm'),
('RS384', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-384 hash '
'algorithm'),
('RS512', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-512 hash '
'algorithm'),
('PS256', 'RSASSA-PSS signature using SHA-256 and MGF1 padding with '
'SHA-256'),
('PS384', 'RSASSA-PSS signature using SHA-384 and MGF1 padding with '
'SHA-384'),
('PS512', 'RSASSA-PSS signature using SHA-512 and MGF1 padding with '
'SHA-512'),
], string='Algorithm', help='Algorithm used to sign the JSON Web Token.')
jwt_private_key = fields.Text(
string='Private Key',
help='Private key used for the JSON Web Token generation.')
jwt_public_key = fields.Text(
string='Public Key', compute='_compute_jwt_public_key',
help='Public key used for the JSON Web Token generation.')
@api.multi
def _load_private_key(self):
""" Load the client's private key into a cryptography's object instance
"""
return load_pem_private_key(
str(self.jwt_private_key),
password=None,
backend=default_backend(),
)
@api.multi
@api.constrains('jwt_algorithm', 'jwt_private_key')
def _check_jwt_private_key(self):
""" Check if the private key's type matches the selected algorithm
This check is only performed for asymetric algorithms
"""
for client in self:
algorithm_prefix = client.jwt_algorithm[:2]
if client.jwt_private_key and \
algorithm_prefix in self.CRYPTOSYSTEMS:
private_key = client._load_private_key()
if not isinstance(
private_key, self.CRYPTOSYSTEMS[algorithm_prefix]):
raise exceptions.ValidationError(
_('The private key doesn\'t fit the selected '
'algorithm!'))
@api.multi
def generate_private_key(self):
""" Generate a private key for ECDSA and RSA algorithm clients """
for client in self:
algorithm_prefix = client.jwt_algorithm[:2]
if algorithm_prefix == 'ES':
key = ec.generate_private_key(
curve=ec.SECT283R1,
backend=default_backend(),
)
elif algorithm_prefix in ('RS', 'PS'):
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048,
backend=default_backend(),
)
else:
raise exceptions.UserError(
_('You can only generate private keys for asymetric '
'algorithms!'))
client.jwt_private_key = key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=NoEncryption(),
)
@api.multi
def _compute_jwt_public_key(self):
""" Compute the public key associated to the client's private key
This is only done for asymetric algorithms
"""
for client in self:
if client.jwt_private_key and \
client.jwt_algorithm[:2] in self.CRYPTOSYSTEMS:
private_key = client._load_private_key()
client.jwt_public_key = private_key.public_key().public_bytes(
Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
else:
client.jwt_public_key = False
@api.model
def _generate_jwt_payload(self, request):
""" Generate a payload containing data from the client """
utcnow = datetime.utcnow()
data = {
'exp': utcnow + timedelta(seconds=request.expires_in),
'nbf': utcnow,
'iss': 'Odoo',
'aud': request.client.identifier,
'iat': utcnow,
'user_id': request.client.generate_user_id(request.odoo_user),
}
if request.client.jwt_scope_id:
# Sudo as the token's user to execute the scope's filter with that
# user's rights
scope = request.client.jwt_scope_id.sudo(user=request.odoo_user)
scope_data = scope.get_data_for_model(
'res.users', res_id=request.odoo_user.id)
# Remove the user id in scope data
del scope_data['id']
data.update(scope_data)
return data
@api.multi
def get_oauth2_server(self, validator=None, **kwargs):
""" Add a custom JWT token generator in the server's arguments """
self.ensure_one()
def jwt_generator(request):
""" Generate a JSON Web Token using a custom payload from the client
"""
payload = self._generate_jwt_payload(request)
return jwt.encode(
payload,
request.client.jwt_private_key,
algorithm=request.client.jwt_algorithm,
)
# Add the custom generator only if none is already defined
if self.token_type == 'jwt' and 'token_generator' not in kwargs:
kwargs['token_generator'] = jwt_generator
kwargs['refresh_token_generator'] = random_token_generator
return super(OAuthProviderClient, self).get_oauth2_server(
validator=validator, **kwargs)

5
oauth_provider_jwt/tests/__init__.py

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
from . import test_oauth_provider_json_web_token

235
oauth_provider_jwt/tests/test_oauth_provider_json_web_token.py

@ -0,0 +1,235 @@
# -*- coding: utf-8 -*-
# Copyright 2016 SYLEAM
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
from openerp import exceptions
from openerp.addons.oauth_provider.tests.common_test_controller import \
OAuthProviderControllerTransactionCase
from ..models.oauth_provider_client import OAuthProviderClient
_logger = logging.getLogger(__name__)
try:
import jwt
except ImportError:
_logger.debug('Cannot `import jwt`.')
class TestOAuthProviderController(OAuthProviderControllerTransactionCase):
def setUp(self):
# Use the legacy appication profile for tests to execute all requests
# as public user. This allows to rightly tests access rghts
super(TestOAuthProviderController, self).setUp('legacy application')
# Configure the client to generate a JSON Web Token
self.client.token_type = 'jwt'
# Define base values for a scope creation
self.filter = self.env['ir.filters'].create({
'name': 'User filter',
'model_id': 'res.users',
'domain': "[('id', '=', uid)]",
})
self.scope_vals = {
'name': 'Scope',
'code': 'scope',
'description': 'Description of the scope',
'model_id': self.env.ref('base.model_res_users').id,
'filter_id': self.filter.id,
'field_ids': [
(6, 0, [self.env.ref('base.field_res_users_email').id]),
],
}
def new_scope(self):
return self.env['oauth.provider.scope'].create(self.scope_vals)
def generate_private_key(self):
""" Generates a private key depending on the algorithm
Returns the key needed to decode the signature
"""
if self.client.jwt_algorithm[:2] not in \
OAuthProviderClient.CRYPTOSYSTEMS:
# Use the private key as decoding key for symetric algorithms
self.client.jwt_private_key = 'secret key'
decoding_key = self.client.jwt_private_key
else:
# Generate a random private key for asymetric algorithms
self.client.generate_private_key()
decoding_key = self.client.jwt_public_key
return decoding_key
def common_test_json_web_token(self, algorithm):
""" Check generation of a JSON Web Token using a symetric algorithm """
# Configure the client to use an HS512 algorithm
self.client.jwt_algorithm = algorithm
decoding_key = self.generate_private_key()
# Ask a token to the server
state = 'Some custom state'
self.post_request('/oauth2/token', data={
'client_id': self.client.identifier,
'scope': self.client.scope_ids[0].code,
'grant_type': self.client.grant_type,
'username': self.user.login,
'password': 'demo',
'state': state,
})
# A new token should have been generated
# We can safely pick the latest generated token here, because no other
# token could have been generated during the test
token = self.env['oauth.provider.token'].search([
('client_id', '=', self.client.id),
], order='id DESC', limit=1)
# Check token's contents
token_contents = jwt.decode(
token.token,
decoding_key,
algorithm=self.client.jwt_algorithm,
audience=self.client.identifier,
)
self.assertEqual(token_contents['user_id'], token.generate_user_id())
return token_contents
def test_json_web_token_hs256(self):
""" Execute the JSON Web Token test using HS256 algorithm """
token_contents = self.common_test_json_web_token('HS256')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_hs384(self):
""" Execute the JSON Web Token test using HS384 algorithm """
token_contents = self.common_test_json_web_token('HS384')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_hs512(self):
""" Execute the JSON Web Token test using HS512 algorithm """
token_contents = self.common_test_json_web_token('HS512')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_es256(self):
""" Execute the JSON Web Token test using ES256 algorithm """
token_contents = self.common_test_json_web_token('ES256')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_es384(self):
""" Execute the JSON Web Token test using ES384 algorithm """
token_contents = self.common_test_json_web_token('ES384')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_es512(self):
""" Execute the JSON Web Token test using ES512 algorithm """
token_contents = self.common_test_json_web_token('ES512')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_rs256(self):
""" Execute the JSON Web Token test using RS256 algorithm """
token_contents = self.common_test_json_web_token('RS256')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_rs384(self):
""" Execute the JSON Web Token test using RS384 algorithm """
token_contents = self.common_test_json_web_token('RS384')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_rs512(self):
""" Execute the JSON Web Token test using RS512 algorithm """
token_contents = self.common_test_json_web_token('RS512')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_ps256(self):
""" Execute the JSON Web Token test using PS256 algorithm """
token_contents = self.common_test_json_web_token('PS256')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_ps384(self):
""" Execute the JSON Web Token test using PS384 algorithm """
token_contents = self.common_test_json_web_token('PS384')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_ps512(self):
""" Execute the JSON Web Token test using PS512 algorithm """
token_contents = self.common_test_json_web_token('PS512')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id']))
def test_json_web_token_with_scope(self):
""" Execute the JSON Web Token test with additional scope data """
self.client.jwt_scope_id = self.new_scope()
token_contents = self.common_test_json_web_token('PS512')
self.assertEqual(
sorted(token_contents.keys()),
sorted(['exp', 'nbf', 'iss', 'aud', 'iat', 'user_id', 'email']))
self.assertEqual(token_contents['email'], self.user.email)
def test_empty_public_key_for_symetric_algorithm(self):
""" Check that symetric algorithm return an empty public key """
self.client.jwt_algorithm = 'HS512'
self.client.jwt_private_key = 'secret key'
self.assertEqual(self.client.jwt_public_key, False)
def test_generate_private_key_for_symetric_algorithm(self):
""" Check that symetric algorithm don't generate random private key """
self.client.jwt_algorithm = 'HS512'
with self.assertRaises(exceptions.UserError):
self.client.generate_private_key()
def test_private_key_constraint(self):
""" Check the private key/algorithm consistency constraint """
self.client.jwt_algorithm = 'ES512'
# Generate an ECDSA private key
self.client.generate_private_key()
with self.assertRaises(exceptions.ValidationError):
# Check that the ECDSA private key is not allowed for an RSA
# configured client
self.client.jwt_algorithm = 'RS512'
def test_public_key_retrieval_without_argument(self):
""" Check the /oauth2/public_key route """
response = self.get_request('/oauth2/public_key')
self.assertEqual(response.data, '')
def test_public_key_retrieval_symetric(self):
""" Check the /oauth2/public_key route """
self.client.jwt_algorithm = 'HS512'
self.generate_private_key()
response = self.get_request('/oauth2/public_key', data={
'client_id': self.client.identifier,
})
self.assertEqual(response.data, '')
def test_public_key_retrieval_asymetric(self):
""" Check the /oauth2/public_key route """
self.client.jwt_algorithm = 'RS512'
public_key = self.generate_private_key()
response = self.get_request('/oauth2/public_key', data={
'client_id': self.client.identifier,
})
self.assertEqual(response.data, public_key)

25
oauth_provider_jwt/views/oauth_provider_client.xml

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2016 SYLEAM
License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
-->
<odoo>
<record id="view_oauth_provider_client_form" model="ir.ui.view">
<field name="name">oauth.provider.client.form</field>
<field name="model">oauth.provider.client</field>
<field name="inherit_id" ref="oauth_provider.view_oauth_provider_client_form"/>
<field name="arch" type="xml">
<xpath expr="//notebook" position="inside">
<page string="JSON Web Token" attrs="{'invisible': [('token_type', '!=', 'jwt')]}">
<group>
<field name="jwt_scope_id"/>
<field name="jwt_algorithm" attrs="{'required': [('token_type', '=', 'jwt')]}"/>
<button string="Generate a new random private key" name="generate_private_key" type="object" colspan="2" attrs="{'invisible': [('jwt_algorithm', 'in', [False, 'HS256', 'HS384', 'HS512'])]}"/>
<field name="jwt_private_key"/>
<field name="jwt_public_key"/>
</group>
</page>
</xpath>
</field>
</record>
</odoo>

2
requirements.txt

@ -7,3 +7,5 @@ pysftp
pyotp
fs==0.5.4
oauthlib
pyjwt
cryptography
Loading…
Cancel
Save