OCA reporting engine fork for dev and update.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

191 lines
7.3 KiB

  1. # -*- coding: utf-8 -*-
  2. # © 2015 Antiun Ingenieria S.L. - Antonio Espinosa
  3. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  4. import base64
  5. from contextlib import closing
  6. import os
  7. import subprocess
  8. import tempfile
  9. import time
  10. from openerp import models, api, _
  11. from openerp.exceptions import Warning, AccessError
  12. from openerp.tools.safe_eval import safe_eval
  13. import logging
  14. _logger = logging.getLogger(__name__)
  15. def _normalize_filepath(path):
  16. path = path or ''
  17. path = path.strip()
  18. if not os.path.isabs(path):
  19. me = os.path.dirname(__file__)
  20. path = '{}/../static/certificate/'.format(me) + path
  21. path = os.path.normpath(path)
  22. return path if os.path.exists(path) else False
  23. class Report(models.Model):
  24. _inherit = 'report'
  25. def _certificate_get(self, cr, uid, ids, report, context=None):
  26. if report.report_type != 'qweb-pdf':
  27. _logger.info(
  28. "Can only sign qweb-pdf reports, this one is '%s' type",
  29. report.report_type)
  30. return False
  31. m_cert = self.pool['report.certificate']
  32. company_id = self.pool['res.users']._get_company(cr, uid)
  33. certificate_ids = m_cert.search(cr, uid, [
  34. ('company_id', '=', company_id),
  35. ('model_id', '=', report.model)], context=context)
  36. if not certificate_ids:
  37. _logger.info(
  38. "No PDF certificate found for report '%s'",
  39. report.report_name)
  40. return False
  41. for cert in m_cert.browse(cr, uid, certificate_ids, context=context):
  42. # Check allow only one document
  43. if cert.allow_only_one and len(ids) > 1:
  44. _logger.info(
  45. "Certificate '%s' allows only one document, "
  46. "but printing %d documents",
  47. cert.name, len(ids))
  48. continue
  49. # Check domain
  50. if cert.domain:
  51. m_model = self.pool[cert.model_id.model]
  52. domain = [('id', 'in', tuple(ids))]
  53. domain = domain + safe_eval(cert.domain)
  54. doc_ids = m_model.search(cr, uid, domain, context=context)
  55. if not doc_ids:
  56. _logger.info(
  57. "Certificate '%s' domain not satisfied", cert.name)
  58. continue
  59. # Certificate match!
  60. return cert
  61. return False
  62. def _attach_filename_get(self, cr, uid, ids, certificate, context=None):
  63. if len(ids) != 1:
  64. return False
  65. obj = self.pool[certificate.model_id.model].browse(cr, uid, ids[0])
  66. filename = safe_eval(certificate.attachment, {
  67. 'object': obj,
  68. 'time': time
  69. })
  70. return filename
  71. def _attach_signed_read(self, cr, uid, ids, certificate, context=None):
  72. if len(ids) != 1:
  73. return False
  74. filename = self._attach_filename_get(
  75. cr, uid, ids, certificate, context=context)
  76. if not filename:
  77. return False
  78. signed = False
  79. m_attachment = self.pool['ir.attachment']
  80. attach_ids = m_attachment.search(cr, uid, [
  81. ('datas_fname', '=', filename),
  82. ('res_model', '=', certificate.model_id.model),
  83. ('res_id', '=', ids[0])
  84. ])
  85. if attach_ids:
  86. signed = m_attachment.browse(cr, uid, attach_ids[0]).datas
  87. signed = base64.decodestring(signed)
  88. return signed
  89. def _attach_signed_write(self, cr, uid, ids, certificate, signed,
  90. context=None):
  91. if len(ids) != 1:
  92. return False
  93. filename = self._attach_filename_get(
  94. cr, uid, ids, certificate, context=context)
  95. if not filename:
  96. return False
  97. m_attachment = self.pool['ir.attachment']
  98. try:
  99. attach_id = m_attachment.create(cr, uid, {
  100. 'name': filename,
  101. 'datas': base64.encodestring(signed),
  102. 'datas_fname': filename,
  103. 'res_model': certificate.model_id.model,
  104. 'res_id': ids[0],
  105. })
  106. except AccessError:
  107. raise Warning(
  108. _('Saving signed report (PDF):') + '\n',
  109. _('You do not have enought access rights to save attachments'))
  110. else:
  111. _logger.info(
  112. "The signed PDF document '%s' is now saved in the database",
  113. filename)
  114. return attach_id
  115. def _signer_bin(self, opts):
  116. me = os.path.dirname(__file__)
  117. return 'java -jar {}/../static/jar/jPdfSign.jar '.format(me) + opts
  118. def pdf_sign(self, pdf, certificate):
  119. pdfsigned = pdf + '.signed.pdf'
  120. p12 = _normalize_filepath(certificate.path)
  121. passwd = _normalize_filepath(certificate.password_file)
  122. if not (p12 and passwd):
  123. raise Warning(
  124. _('Signing report (PDF)'),
  125. _('Certificate or password file not found'))
  126. signer_opts = '"%s" "%s" "%s" "%s"' % (p12, pdf, pdfsigned, passwd)
  127. signer = self._signer_bin(signer_opts)
  128. process = subprocess.Popen(
  129. signer, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
  130. out, err = process.communicate()
  131. if process.returncode:
  132. raise Warning(
  133. _('Signing report (PDF)'),
  134. _('jPdfSign failed (error code: %s). '
  135. 'Message: %s') % (str(process.returncode), err))
  136. return pdfsigned
  137. @api.v7
  138. def get_pdf(self, cr, uid, ids, report_name, html=None, data=None,
  139. context=None):
  140. signed_content = False
  141. report = self._get_report_from_name(cr, uid, report_name)
  142. certificate = self._certificate_get(
  143. cr, uid, ids, report, context=context)
  144. if certificate and certificate.attachment:
  145. signed_content = self._attach_signed_read(
  146. cr, uid, ids, certificate, context=context)
  147. if signed_content:
  148. _logger.info("The signed PDF document '%s/%s' was loaded from "
  149. "the database", report_name, ids)
  150. return signed_content
  151. content = super(Report, self).get_pdf(
  152. cr, uid, ids, report_name, html=html, data=data,
  153. context=context)
  154. if certificate:
  155. # Creating temporary origin PDF
  156. pdf_fd, pdf = tempfile.mkstemp(
  157. suffix='.pdf', prefix='report.tmp.')
  158. with closing(os.fdopen(pdf_fd, 'w')) as pf:
  159. pf.write(content)
  160. _logger.info(
  161. "Signing PDF document '%s/%s' with certificate '%s'",
  162. report_name, ids, certificate.name)
  163. signed = self.pdf_sign(pdf, certificate)
  164. # Read signed PDF
  165. if os.path.exists(signed):
  166. with open(signed, 'rb') as pf:
  167. content = pf.read()
  168. # Manual cleanup of the temporary files
  169. for fname in (pdf, signed):
  170. try:
  171. os.unlink(fname)
  172. except (OSError, IOError):
  173. _logger.error('Error when trying to remove file %s', fname)
  174. if certificate.attachment:
  175. self._attach_signed_write(
  176. cr, uid, ids, certificate, content, context=context)
  177. return content