OCA reporting engine fork for dev and update.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
6.4 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015 Tecnativa - Antonio Espinosa
  3. # Copyright 2017 Tecnativa - Pedro M. Baeza
  4. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  5. import base64
  6. from contextlib import closing
  7. import os
  8. import subprocess
  9. import tempfile
  10. import time
  11. from odoo import models, api, _
  12. from odoo.exceptions import UserError, AccessError
  13. from odoo.tools.safe_eval import safe_eval
  14. import logging
  15. _logger = logging.getLogger(__name__)
  16. def _normalize_filepath(path):
  17. path = path or ''
  18. path = path.strip()
  19. if not os.path.isabs(path):
  20. me = os.path.dirname(__file__)
  21. path = '{}/../static/certificate/'.format(me) + path
  22. path = os.path.normpath(path)
  23. return path if os.path.exists(path) else False
  24. class Report(models.Model):
  25. _inherit = 'report'
  26. def _certificate_get(self, report, docids):
  27. """Obtain the proper certificate for the report and the conditions."""
  28. if report.report_type != 'qweb-pdf':
  29. return False
  30. certificates = self.env['report.certificate'].search([
  31. ('company_id', '=', self.env.user.company_id.id),
  32. ('model_id', '=', report.model),
  33. ])
  34. if not certificates:
  35. return False
  36. for cert in certificates:
  37. # Check allow only one document
  38. if cert.allow_only_one and len(self) > 1:
  39. _logger.debug(
  40. "Certificate '%s' allows only one document, "
  41. "but printing %d documents",
  42. cert.name, len(docids))
  43. continue
  44. # Check domain
  45. if cert.domain:
  46. domain = [('id', 'in', tuple(docids))]
  47. domain = domain + safe_eval(cert.domain)
  48. docs = self.env[cert.model_id.model].search(domain)
  49. if not docs:
  50. _logger.debug(
  51. "Certificate '%s' domain not satisfied", cert.name)
  52. continue
  53. # Certificate match!
  54. return cert
  55. return False
  56. def _attach_filename_get(self, docids, certificate):
  57. if len(docids) != 1:
  58. return False
  59. doc = self.env[certificate.model_id.model].browse(docids[0])
  60. return safe_eval(certificate.attachment, {
  61. 'object': doc,
  62. 'time': time
  63. })
  64. def _attach_signed_read(self, docids, certificate):
  65. if len(docids) != 1:
  66. return False
  67. filename = self._attach_filename_get(docids, certificate)
  68. if not filename:
  69. return False
  70. attachment = self.env['ir.attachment'].search([
  71. ('datas_fname', '=', filename),
  72. ('res_model', '=', certificate.model_id.model),
  73. ('res_id', '=', docids[0]),
  74. ], limit=1)
  75. if attachment:
  76. return base64.decodestring(attachment.datas)
  77. return False
  78. def _attach_signed_write(self, docids, certificate, signed):
  79. if len(docids) != 1:
  80. return False
  81. filename = self._attach_filename_get(docids, certificate)
  82. if not filename:
  83. return False
  84. try:
  85. attachment = self.env['ir.attachment'].create({
  86. 'name': filename,
  87. 'datas': base64.encodestring(signed),
  88. 'datas_fname': filename,
  89. 'res_model': certificate.model_id.model,
  90. 'res_id': docids[0],
  91. })
  92. except AccessError:
  93. raise UserError(
  94. _('Saving signed report (PDF): '
  95. 'You do not have enough access rights to save attachments'))
  96. return attachment
  97. def _signer_bin(self, opts):
  98. me = os.path.dirname(__file__)
  99. java_bin = 'java -jar'
  100. jar = '{}/../static/jar/jPdfSign.jar'.format(me)
  101. return '%s %s %s' % (java_bin, jar, opts)
  102. def pdf_sign(self, pdf, certificate):
  103. pdfsigned = pdf + '.signed.pdf'
  104. p12 = _normalize_filepath(certificate.path)
  105. passwd = _normalize_filepath(certificate.password_file)
  106. if not (p12 and passwd):
  107. raise UserError(
  108. _('Signing report (PDF): '
  109. 'Certificate or password file not found'))
  110. signer_opts = '"%s" "%s" "%s" "%s"' % (p12, pdf, pdfsigned, passwd)
  111. signer = self._signer_bin(signer_opts)
  112. process = subprocess.Popen(
  113. signer, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  114. out, err = process.communicate()
  115. if process.returncode:
  116. raise UserError(
  117. _('Signing report (PDF): jPdfSign failed (error code: %s). '
  118. 'Message: %s. Output: %s') %
  119. (process.returncode, err, out))
  120. return pdfsigned
  121. @api.model
  122. def get_pdf(self, docids, report_name, html=None, data=None):
  123. report = self._get_report_from_name(report_name)
  124. certificate = self._certificate_get(report, docids)
  125. if certificate and certificate.attachment:
  126. signed_content = self._attach_signed_read(docids, certificate)
  127. if signed_content:
  128. _logger.debug(
  129. "The signed PDF document '%s/%s' was loaded from the "
  130. "database", report_name, docids,
  131. )
  132. return signed_content
  133. content = super(Report, self).get_pdf(
  134. docids, report_name, html=html, data=data,
  135. )
  136. if certificate:
  137. # Creating temporary origin PDF
  138. pdf_fd, pdf = tempfile.mkstemp(
  139. suffix='.pdf', prefix='report.tmp.')
  140. with closing(os.fdopen(pdf_fd, 'w')) as pf:
  141. pf.write(content)
  142. _logger.debug(
  143. "Signing PDF document '%s' for IDs %s with certificate '%s'",
  144. report_name, docids, certificate.name,
  145. )
  146. signed = self.pdf_sign(pdf, certificate)
  147. # Read signed PDF
  148. if os.path.exists(signed):
  149. with open(signed, 'rb') as pf:
  150. content = pf.read()
  151. # Manual cleanup of the temporary files
  152. for fname in (pdf, signed):
  153. try:
  154. os.unlink(fname)
  155. except (OSError, IOError):
  156. _logger.error('Error when trying to remove file %s', fname)
  157. if certificate.attachment:
  158. self._attach_signed_write(docids, certificate, content)
  159. return content