You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
6.0 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2016 Therp BV <http://therp.nl>
  3. # © 2016 Antonio Espinosa <antonio.espinosa@tecnativa.com>
  4. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  5. import os
  6. import logging
  7. import urllib2
  8. import urlparse
  9. import subprocess
  10. import tempfile
  11. from openerp import _, api, models, exceptions
  12. from openerp.tools import config
  13. DEFAULT_KEY_LENGTH = 4096
  14. _logger = logging.getLogger(__name__)
  15. def get_data_dir():
  16. return os.path.join(config.options.get('data_dir'), 'letsencrypt')
  17. def get_challenge_dir():
  18. return os.path.join(get_data_dir(), 'acme-challenge')
  19. class Letsencrypt(models.AbstractModel):
  20. _name = 'letsencrypt'
  21. _description = 'Abstract model providing functions for letsencrypt'
  22. @api.model
  23. def call_cmdline(self, cmdline, loglevel=logging.INFO,
  24. raise_on_result=True):
  25. process = subprocess.Popen(
  26. cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  27. stdout, stderr = process.communicate()
  28. if stderr:
  29. _logger.log(loglevel, stderr)
  30. if stdout:
  31. _logger.log(loglevel, stdout)
  32. if process.returncode:
  33. raise exceptions.Warning(
  34. _('Error calling %s: %d') % (cmdline[0], process.returncode)
  35. )
  36. return process.returncode
  37. @api.model
  38. def generate_account_key(self):
  39. data_dir = get_data_dir()
  40. if not os.path.isdir(data_dir):
  41. os.makedirs(data_dir)
  42. account_key = os.path.join(data_dir, 'account.key')
  43. if not os.path.isfile(account_key):
  44. _logger.info('generating rsa account key')
  45. self.call_cmdline([
  46. 'openssl', 'genrsa', '-out', account_key,
  47. str(DEFAULT_KEY_LENGTH),
  48. ])
  49. assert os.path.isfile(account_key), 'failed to create rsa key'
  50. return account_key
  51. @api.model
  52. def generate_domain_key(self, domain):
  53. domain_key = os.path.join(get_data_dir(), '%s.key' % domain)
  54. if not os.path.isfile(domain_key):
  55. _logger.info('generating rsa domain key for %s', domain)
  56. self.call_cmdline([
  57. 'openssl', 'genrsa', '-out', domain_key,
  58. str(DEFAULT_KEY_LENGTH),
  59. ])
  60. return domain_key
  61. @api.model
  62. def validate_domain(self, domain):
  63. local_domains = [
  64. 'localhost', 'localhost.localdomain', 'localhost6',
  65. 'localhost6.localdomain6'
  66. ]
  67. def _ip_is_private(address):
  68. import IPy
  69. try:
  70. ip = IPy.IP(address)
  71. except:
  72. return False
  73. return ip.iptype() == 'PRIVATE'
  74. if domain in local_domains or _ip_is_private(domain):
  75. raise exceptions.Warning(
  76. _("Let's encrypt doesn't work with private addresses "
  77. "or local domains!"))
  78. @api.model
  79. def generate_csr(self, domain):
  80. domains = [domain]
  81. parameter_model = self.env['ir.config_parameter']
  82. altnames = parameter_model.search(
  83. [('key', 'like', 'letsencrypt.altname.')],
  84. order='key'
  85. )
  86. for altname in altnames:
  87. domains.append(altname.value)
  88. _logger.info('generating csr for %s', domain)
  89. if len(domains) > 1:
  90. _logger.info('with alternative subjects %s', ','.join(domains[1:]))
  91. config = parameter_model.get_param(
  92. 'letsencrypt.openssl.cnf', '/etc/ssl/openssl.cnf'
  93. )
  94. csr = os.path.join(get_data_dir(), '%s.csr' % domain)
  95. with tempfile.NamedTemporaryFile() as cfg:
  96. cfg.write(open(config).read())
  97. if len(domains) > 1:
  98. cfg.write(
  99. '\n[SAN]\nsubjectAltName=' +
  100. ','.join(map(lambda x: 'DNS:%s' % x, domains)) + '\n')
  101. cfg.file.flush()
  102. cmdline = [
  103. 'openssl', 'req', '-new',
  104. parameter_model.get_param(
  105. 'letsencrypt.openssl.digest', '-sha256'),
  106. '-key', self.generate_domain_key(domain),
  107. '-subj', '/CN=%s' % domain, '-config', cfg.name,
  108. '-out', csr,
  109. ]
  110. if len(domains) > 1:
  111. cmdline.extend([
  112. '-reqexts', 'SAN',
  113. ])
  114. self.call_cmdline(cmdline)
  115. return csr
  116. @api.model
  117. def cron(self):
  118. domain = urlparse.urlparse(
  119. self.env['ir.config_parameter'].get_param(
  120. 'web.base.url', 'localhost')).netloc
  121. self.validate_domain(domain)
  122. account_key = self.generate_account_key()
  123. csr = self.generate_csr(domain)
  124. acme_challenge = get_challenge_dir()
  125. if not os.path.isdir(acme_challenge):
  126. os.makedirs(acme_challenge)
  127. if self.env.context.get('letsencrypt_dry_run'):
  128. crt_text = 'I\'m a test text'
  129. else: # pragma: no cover
  130. from acme_tiny import get_crt, DEFAULT_CA
  131. crt_text = get_crt(
  132. account_key, csr, acme_challenge, log=_logger, CA=DEFAULT_CA)
  133. with open(os.path.join(get_data_dir(), '%s.crt' % domain), 'w')\
  134. as crt:
  135. crt.write(crt_text)
  136. chain_cert = urllib2.urlopen(
  137. self.env['ir.config_parameter'].get_param(
  138. 'letsencrypt.chain_certificate_address',
  139. 'https://letsencrypt.org/certs/'
  140. 'lets-encrypt-x3-cross-signed.pem')
  141. )
  142. crt.write(chain_cert.read())
  143. chain_cert.close()
  144. _logger.info('wrote %s', crt.name)
  145. reload_cmd = self.env['ir.config_parameter'].get_param(
  146. 'letsencrypt.reload_command', False)
  147. if reload_cmd:
  148. _logger.info('reloading webserver...')
  149. self.call_cmdline(['sh', '-c', reload_cmd])
  150. else:
  151. _logger.info('no command defined for reloading webserver, please '
  152. 'do it manually in order to apply new certificate')