You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

168 lines
6.0 KiB

# © 2016 Therp BV <http://therp.nl>
# © 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>
# © 2018 Ignacio Ibeas <ignacio@acysos.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import os
import logging
import urllib.request
import urllib.parse
import subprocess
import tempfile
from odoo import _, api, models, exceptions
from odoo.tools import config
DEFAULT_KEY_LENGTH = 4096
_logger = logging.getLogger(__name__)
def get_data_dir():
return os.path.join(config.options.get('data_dir'), 'letsencrypt')
def get_challenge_dir():
return os.path.join(get_data_dir(), 'acme-challenge')
class Letsencrypt(models.AbstractModel):
_name = 'letsencrypt'
_description = 'Abstract model providing functions for letsencrypt'
@api.model
def call_cmdline(self, cmdline, loglevel=logging.INFO,
raise_on_result=True):
process = subprocess.Popen(
cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
if stderr:
_logger.log(loglevel, stderr)
if stdout:
_logger.log(loglevel, stdout)
if process.returncode:
raise exceptions.Warning(
_('Error calling %s: %d') % (cmdline[0], process.returncode)
)
return process.returncode
@api.model
def generate_account_key(self):
data_dir = get_data_dir()
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
account_key = os.path.join(data_dir, 'account.key')
if not os.path.isfile(account_key):
_logger.info('generating rsa account key')
self.call_cmdline([
'openssl', 'genrsa', '-out', account_key,
str(DEFAULT_KEY_LENGTH),
])
assert os.path.isfile(account_key), 'failed to create rsa key'
return account_key
@api.model
def generate_domain_key(self, domain):
domain_key = os.path.join(get_data_dir(), '%s.key' % domain)
if not os.path.isfile(domain_key):
_logger.info('generating rsa domain key for %s', domain)
self.call_cmdline([
'openssl', 'genrsa', '-out', domain_key,
str(DEFAULT_KEY_LENGTH),
])
return domain_key
@api.model
def validate_domain(self, domain):
local_domains = [
'localhost', 'localhost.localdomain', 'localhost6',
'localhost6.localdomain6'
]
def _ip_is_private(address):
import IPy
try:
ip = IPy.IP(address)
except Exception:
return False
return ip.iptype() == 'PRIVATE'
if domain in local_domains or _ip_is_private(domain):
raise exceptions.Warning(
_("Let's encrypt doesn't work with private addresses "
"or local domains!"))
@api.model
def generate_csr(self, domain):
domains = [domain]
parameter_model = self.env['ir.config_parameter']
altnames = parameter_model.search(
[('key', 'like', 'letsencrypt.altname.')],
order='key'
)
for altname in altnames:
domains.append(altname.value)
_logger.info('generating csr for %s', domain)
if len(domains) > 1:
_logger.info('with alternative subjects %s', ','.join(domains[1:]))
config = parameter_model.get_param(
'letsencrypt.openssl.cnf', '/etc/ssl/openssl.cnf'
)
csr = os.path.join(get_data_dir(), '%s.csr' % domain)
with tempfile.NamedTemporaryFile(mode='wt') as cfg:
cfg.write(open(config).read())
if len(domains) > 1:
cfg.write(
'\n[SAN]\nsubjectAltName=' +
','.join(['DNS:%s' % x for x in domains]) + '\n')
cfg.file.flush()
cmdline = [
'openssl', 'req', '-new',
parameter_model.get_param(
'letsencrypt.openssl.digest', '-sha256'),
'-key', self.generate_domain_key(domain),
'-subj', '/CN=%s' % domain, '-config', cfg.name,
'-out', csr,
]
if len(domains) > 1:
cmdline.extend([
'-reqexts', 'SAN',
])
self.call_cmdline(cmdline)
return csr
@api.model
def cron(self):
domain = urllib.parse.urlparse(
self.env['ir.config_parameter'].get_param(
'web.base.url', 'localhost')).netloc
self.validate_domain(domain)
account_key = self.generate_account_key()
csr = self.generate_csr(domain)
acme_challenge = get_challenge_dir()
if not os.path.isdir(acme_challenge):
os.makedirs(acme_challenge)
if self.env.context.get('letsencrypt_dry_run'):
crt_text = 'I\'m a test text'
else: # pragma: no cover
from acme_tiny import get_crt, DEFAULT_CA
crt_text = get_crt(
account_key, csr, acme_challenge, log=_logger, CA=DEFAULT_CA)
with open(os.path.join(get_data_dir(), '%s.crt' % domain), 'w')\
as crt:
crt.write(crt_text)
chain_cert = urllib.request.urlopen(
self.env['ir.config_parameter'].get_param(
'letsencrypt.chain_certificate_address',
'https://letsencrypt.org/certs/'
'lets-encrypt-x3-cross-signed.pem')
)
crt.write(str(chain_cert.read()))
chain_cert.close()
_logger.info('wrote %s', crt.name)
reload_cmd = self.env['ir.config_parameter'].sudo().get_param(
'letsencrypt.reload_command', False)
if reload_cmd:
_logger.info('reloading webserver...')
self.call_cmdline(['sh', '-c', reload_cmd])
else:
_logger.info('no command defined for reloading webserver, please '
'do it manually in order to apply new certificate')