You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
191 lines
7.3 KiB
191 lines
7.3 KiB
# -*- coding: utf-8 -*-
|
|
# Copyright 2016 SYLEAM
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from openerp import models, api, fields, exceptions, _
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from oauthlib.oauth2.rfc6749.tokens import random_token_generator
|
|
except ImportError:
|
|
_logger.debug('Cannot `import oauthlib`.')
|
|
|
|
try:
|
|
import jwt
|
|
except ImportError:
|
|
_logger.debug('Cannot `import jwt`.')
|
|
|
|
try:
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric.ec import \
|
|
EllipticCurvePrivateKey
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
|
|
from cryptography.hazmat.primitives.serialization import \
|
|
Encoding, PublicFormat, PrivateFormat, NoEncryption, \
|
|
load_pem_private_key
|
|
from cryptography.hazmat.primitives.asymmetric import rsa, ec
|
|
except ImportError:
|
|
_logger.debug('Cannot `import cryptography`.')
|
|
|
|
|
|
class OAuthProviderClient(models.Model):
|
|
_inherit = 'oauth.provider.client'
|
|
|
|
CRYPTOSYSTEMS = {
|
|
'ES': EllipticCurvePrivateKey,
|
|
'RS': RSAPrivateKey,
|
|
'PS': RSAPrivateKey,
|
|
}
|
|
|
|
token_type = fields.Selection(selection_add=[('jwt', 'JSON Web Token')])
|
|
jwt_scope_id = fields.Many2one(
|
|
comodel_name='oauth.provider.scope', string='Data Scope',
|
|
domain=[('model_id.model', '=', 'res.users')],
|
|
help='Scope executed to add some user\'s data in the token.')
|
|
jwt_algorithm = fields.Selection(selection=[
|
|
('HS256', 'HMAC using SHA-256 hash algorithm'),
|
|
('HS384', 'HMAC using SHA-384 hash algorithm'),
|
|
('HS512', 'HMAC using SHA-512 hash algorithm'),
|
|
('ES256', 'ECDSA signature algorithm using SHA-256 hash algorithm'),
|
|
('ES384', 'ECDSA signature algorithm using SHA-384 hash algorithm'),
|
|
('ES512', 'ECDSA signature algorithm using SHA-512 hash algorithm'),
|
|
('RS256', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-256 hash '
|
|
'algorithm'),
|
|
('RS384', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-384 hash '
|
|
'algorithm'),
|
|
('RS512', 'RSASSA-PKCS1-v1_5 signature algorithm using SHA-512 hash '
|
|
'algorithm'),
|
|
('PS256', 'RSASSA-PSS signature using SHA-256 and MGF1 padding with '
|
|
'SHA-256'),
|
|
('PS384', 'RSASSA-PSS signature using SHA-384 and MGF1 padding with '
|
|
'SHA-384'),
|
|
('PS512', 'RSASSA-PSS signature using SHA-512 and MGF1 padding with '
|
|
'SHA-512'),
|
|
], string='Algorithm', help='Algorithm used to sign the JSON Web Token.')
|
|
jwt_private_key = fields.Text(
|
|
string='Private Key',
|
|
help='Private key used for the JSON Web Token generation.')
|
|
jwt_public_key = fields.Text(
|
|
string='Public Key', compute='_compute_jwt_public_key',
|
|
help='Public key used for the JSON Web Token generation.')
|
|
|
|
@api.multi
|
|
def _load_private_key(self):
|
|
""" Load the client's private key into a cryptography's object instance
|
|
"""
|
|
return load_pem_private_key(
|
|
str(self.jwt_private_key),
|
|
password=None,
|
|
backend=default_backend(),
|
|
)
|
|
|
|
@api.multi
|
|
@api.constrains('jwt_algorithm', 'jwt_private_key')
|
|
def _check_jwt_private_key(self):
|
|
""" Check if the private key's type matches the selected algorithm
|
|
|
|
This check is only performed for asymetric algorithms
|
|
"""
|
|
for client in self:
|
|
algorithm_prefix = client.jwt_algorithm[:2]
|
|
if client.jwt_private_key and \
|
|
algorithm_prefix in self.CRYPTOSYSTEMS:
|
|
private_key = client._load_private_key()
|
|
|
|
if not isinstance(
|
|
private_key, self.CRYPTOSYSTEMS[algorithm_prefix]):
|
|
raise exceptions.ValidationError(
|
|
_('The private key doesn\'t fit the selected '
|
|
'algorithm!'))
|
|
|
|
@api.multi
|
|
def generate_private_key(self):
|
|
""" Generate a private key for ECDSA and RSA algorithm clients """
|
|
for client in self:
|
|
algorithm_prefix = client.jwt_algorithm[:2]
|
|
|
|
if algorithm_prefix == 'ES':
|
|
key = ec.generate_private_key(
|
|
curve=ec.SECT283R1,
|
|
backend=default_backend(),
|
|
)
|
|
elif algorithm_prefix in ('RS', 'PS'):
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537, key_size=2048,
|
|
backend=default_backend(),
|
|
)
|
|
else:
|
|
raise exceptions.UserError(
|
|
_('You can only generate private keys for asymetric '
|
|
'algorithms!'))
|
|
|
|
client.jwt_private_key = key.private_bytes(
|
|
encoding=Encoding.PEM,
|
|
format=PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=NoEncryption(),
|
|
)
|
|
|
|
@api.multi
|
|
def _compute_jwt_public_key(self):
|
|
""" Compute the public key associated to the client's private key
|
|
|
|
This is only done for asymetric algorithms
|
|
"""
|
|
for client in self:
|
|
if client.jwt_private_key and \
|
|
client.jwt_algorithm[:2] in self.CRYPTOSYSTEMS:
|
|
private_key = client._load_private_key()
|
|
client.jwt_public_key = private_key.public_key().public_bytes(
|
|
Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
|
|
else:
|
|
client.jwt_public_key = False
|
|
|
|
@api.model
|
|
def _generate_jwt_payload(self, request):
|
|
""" Generate a payload containing data from the client """
|
|
utcnow = datetime.utcnow()
|
|
data = {
|
|
'exp': utcnow + timedelta(seconds=request.expires_in),
|
|
'nbf': utcnow,
|
|
'iss': 'Odoo',
|
|
'aud': request.client.identifier,
|
|
'iat': utcnow,
|
|
'user_id': request.client.generate_user_id(request.odoo_user),
|
|
}
|
|
if request.client.jwt_scope_id:
|
|
# Sudo as the token's user to execute the scope's filter with that
|
|
# user's rights
|
|
scope = request.client.jwt_scope_id.sudo(user=request.odoo_user)
|
|
scope_data = scope.get_data_for_model(
|
|
'res.users', res_id=request.odoo_user.id)
|
|
# Remove the user id in scope data
|
|
del scope_data['id']
|
|
data.update(scope_data)
|
|
|
|
return data
|
|
|
|
@api.multi
|
|
def get_oauth2_server(self, validator=None, **kwargs):
|
|
""" Add a custom JWT token generator in the server's arguments """
|
|
self.ensure_one()
|
|
|
|
def jwt_generator(request):
|
|
""" Generate a JSON Web Token using a custom payload from the client
|
|
"""
|
|
payload = self._generate_jwt_payload(request)
|
|
return jwt.encode(
|
|
payload,
|
|
request.client.jwt_private_key,
|
|
algorithm=request.client.jwt_algorithm,
|
|
)
|
|
|
|
# Add the custom generator only if none is already defined
|
|
if self.token_type == 'jwt' and 'token_generator' not in kwargs:
|
|
kwargs['token_generator'] = jwt_generator
|
|
kwargs['refresh_token_generator'] = random_token_generator
|
|
|
|
return super(OAuthProviderClient, self).get_oauth2_server(
|
|
validator=validator, **kwargs)
|