You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
232 lines
8.9 KiB
232 lines
8.9 KiB
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
|
|
import odoo
|
|
from odoo import api, fields, models, tools
|
|
from odoo.osv import expression
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
try:
|
|
# We use a jinja2 sandboxed environment to render mako templates.
|
|
# Note that the rendering does not cover all the mako syntax, in particular
|
|
# arbitrary Python statements are not accepted, and not all expressions are
|
|
# allowed: only "public" attributes (not starting with '_') of objects may
|
|
# be accessed.
|
|
# This is done on purpose: it prevents incidental or malicious execution of
|
|
# Python code that may break the security of the server.
|
|
from jinja2.sandbox import SandboxedEnvironment
|
|
|
|
mako_template_env = SandboxedEnvironment(
|
|
variable_start_string="${",
|
|
variable_end_string="}",
|
|
line_statement_prefix="%",
|
|
trim_blocks=True, # do not output newline after blocks
|
|
)
|
|
mako_template_env.globals.update(
|
|
{
|
|
"str": str,
|
|
"datetime": datetime,
|
|
"len": len,
|
|
"abs": abs,
|
|
"min": min,
|
|
"max": max,
|
|
"sum": sum,
|
|
"filter": filter,
|
|
"map": map,
|
|
"round": round,
|
|
}
|
|
)
|
|
except ImportError:
|
|
_logger.warning("jinja2 not available, templating features will not work!")
|
|
|
|
|
|
class AttachmentSynchronizeTask(models.Model):
|
|
_name = "attachment.synchronize.task"
|
|
_description = "Attachment synchronize task"
|
|
|
|
name = fields.Char(required=True)
|
|
method_type = fields.Selection(
|
|
[("import", "Import Task"), ("export", "Export Task")], required=True
|
|
)
|
|
pattern = fields.Char(
|
|
string="Selection Pattern",
|
|
help="Pattern used to select the files to be imported following the 'fnmatch' "
|
|
"special characters (e.g. '*.txt' to catch all the text files).\n"
|
|
"If empty, import all the files found in 'File Path'.",
|
|
)
|
|
filepath = fields.Char(
|
|
string="File Path", help="Path to imported/exported files in the Backend"
|
|
)
|
|
backend_id = fields.Many2one("storage.backend", string="Backend")
|
|
attachment_ids = fields.One2many("attachment.queue", "task_id", string="Attachment")
|
|
move_path = fields.Char(
|
|
string="Move Path", help="Imported File will be moved to this path"
|
|
)
|
|
new_name = fields.Char(
|
|
string="New Name",
|
|
help="Imported File will be renamed to this name.\n"
|
|
"New Name can use 'mako' template where 'obj' is the original file's name.\n"
|
|
"For instance : ${obj.name}-${obj.create_date}.csv",
|
|
)
|
|
after_import = fields.Selection(
|
|
selection=[
|
|
("rename", "Rename"),
|
|
("move", "Move"),
|
|
("move_rename", "Move & Rename"),
|
|
("delete", "Delete"),
|
|
],
|
|
help="Action after import a file",
|
|
)
|
|
file_type = fields.Selection(
|
|
selection=[],
|
|
string="File Type",
|
|
help="Used to fill the 'File Type' field in the imported 'Attachments Queues'."
|
|
"\nFurther operations will be realized on these Attachments Queues depending "
|
|
"on their 'File Type' value.",
|
|
)
|
|
active = fields.Boolean("Enabled", default=True, old="enabled")
|
|
avoid_duplicated_files = fields.Boolean(
|
|
string="Avoid importing duplicated files",
|
|
help="If checked, a file will not be imported if an Attachment Queue with the "
|
|
"same name already exists.",
|
|
)
|
|
failure_emails = fields.Char(
|
|
string="Failure Emails",
|
|
help="Used to fill the 'Failure Emails' field in the 'Attachments Queues' "
|
|
"related to this task.\nAn alert will be sent to these emails if any operation "
|
|
"on these Attachment Queue's file type fails.",
|
|
)
|
|
count_attachment_failed = fields.Integer(compute="_compute_count_state")
|
|
count_attachment_pending = fields.Integer(compute="_compute_count_state")
|
|
count_attachment_done = fields.Integer(compute="_compute_count_state")
|
|
|
|
def _compute_count_state(self):
|
|
for record in self:
|
|
for state in ["failed", "pending", "done"]:
|
|
record["count_attachment_{}".format(state)] = \
|
|
len(record.attachment_ids.filtered(lambda r: r.state == state))
|
|
|
|
def _prepare_attachment_vals(self, data, filename):
|
|
self.ensure_one()
|
|
vals = {
|
|
"name": filename,
|
|
"datas": data,
|
|
"datas_fname": filename,
|
|
"task_id": self.id,
|
|
"file_type": self.file_type or False,
|
|
}
|
|
return vals
|
|
|
|
@api.model
|
|
def _template_render(self, template, record):
|
|
try:
|
|
template = mako_template_env.from_string(tools.ustr(template))
|
|
except Exception:
|
|
_logger.exception("Failed to load template '{}'".format(template))
|
|
|
|
variables = {"obj": record}
|
|
try:
|
|
render_result = template.render(variables)
|
|
except Exception:
|
|
_logger.exception(
|
|
"Failed to render template '{}'' using values '{}'".format(
|
|
template, variables
|
|
)
|
|
)
|
|
render_result = u""
|
|
if render_result == u"False":
|
|
render_result = u""
|
|
return render_result
|
|
|
|
@api.model
|
|
def run_task_import_scheduler(self, domain=None):
|
|
if domain is None:
|
|
domain = []
|
|
domain = expression.AND(
|
|
[domain, [("method_type", "=", "import")]]
|
|
)
|
|
for task in self.search(domain):
|
|
task.run_import()
|
|
|
|
def run(self):
|
|
for record in self:
|
|
method = "run_{}".format(record.method_type)
|
|
if not hasattr(self, method):
|
|
raise NotImplemented
|
|
else:
|
|
getattr(record, method)()
|
|
|
|
def run_import(self):
|
|
self.ensure_one()
|
|
attach_obj = self.env["attachment.queue"]
|
|
backend = self.backend_id
|
|
filepath = self.filepath or ""
|
|
filenames = backend._list(relative_path=filepath, pattern=self.pattern)
|
|
if self.avoid_duplicated_files:
|
|
filenames = self._file_to_import(filenames)
|
|
total_import = 0
|
|
for file_name in filenames:
|
|
with api.Environment.manage():
|
|
with odoo.registry(self.env.cr.dbname).cursor() as new_cr:
|
|
new_env = api.Environment(new_cr, self.env.uid, self.env.context)
|
|
try:
|
|
full_absolute_path = os.path.join(filepath, file_name)
|
|
data = backend._get_b64_data(full_absolute_path)
|
|
attach_vals = self._prepare_attachment_vals(data, file_name)
|
|
attachment = attach_obj.with_env(new_env).create(attach_vals)
|
|
new_full_path = False
|
|
if self.after_import == "rename":
|
|
new_name = self._template_render(self.new_name, attachment)
|
|
new_full_path = os.path.join(filepath, new_name)
|
|
elif self.after_import == "move":
|
|
new_full_path = os.path.join(self.move_path, file_name)
|
|
elif self.after_import == "move_rename":
|
|
new_name = self._template_render(self.new_name, attachment)
|
|
new_full_path = os.path.join(self.move_path, new_name)
|
|
if new_full_path:
|
|
backend._add_b64_data(new_full_path, data)
|
|
if self.after_import in (
|
|
"delete",
|
|
"rename",
|
|
"move",
|
|
"move_rename",
|
|
):
|
|
backend._delete(full_absolute_path)
|
|
total_import += 1
|
|
except Exception as e:
|
|
new_env.cr.rollback()
|
|
raise e
|
|
else:
|
|
new_env.cr.commit()
|
|
_logger.info("Run import complete! Imported {0} files".format(total_import))
|
|
|
|
def _file_to_import(self, filenames):
|
|
imported = (
|
|
self.env["attachment.queue"]
|
|
.search([("name", "in", filenames)])
|
|
.mapped("name")
|
|
)
|
|
return list(set(filenames) - set(imported))
|
|
|
|
def run_export(self):
|
|
for task in self:
|
|
task.attachment_ids.filtered(lambda a: a.state == "pending").run()
|
|
|
|
def button_duplicate_record(self):
|
|
# due to orm limitation method call from ui should not have params
|
|
# so we need to define this method to be able to copy
|
|
# if we do not do this the context will be injected in default params
|
|
# in V14 maybe we can call copy directly
|
|
self.copy()
|
|
|
|
def copy(self, default=None):
|
|
if default is None:
|
|
default = {}
|
|
if "active" not in default:
|
|
default["active"] = False
|
|
return super().copy(default=default)
|