Browse Source

[REF] run black

12.0-mig-module_prototyper_last
Sébastien BEAU 5 years ago
committed by David Beal
parent
commit
1a7f8bece8
  1. 37
      attachment_synchronize/__manifest__.py
  2. 21
      attachment_synchronize/models/attachment.py
  3. 4
      attachment_synchronize/models/storage_backend.py
  4. 170
      attachment_synchronize/models/task.py
  5. 13
      attachment_synchronize/tests/common.py
  6. 15
      attachment_synchronize/tests/test_export.py
  7. 19
      attachment_synchronize/tests/test_import.py

37
attachment_synchronize/__manifest__.py

@ -4,26 +4,21 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
{ {
'name': 'Attachment Synchronize',
'version': '12.0.1.0.0',
'author': 'Akretion,Odoo Community Association (OCA)',
'website': 'https://github.com/oca/server-tools',
'license': 'AGPL-3',
'category': 'Generic Modules',
'depends': [
'attachment_queue',
'storage_backend',
"name": "Attachment Synchronize",
"version": "12.0.1.0.0",
"author": "Akretion,Odoo Community Association (OCA)",
"website": "https://github.com/oca/server-tools",
"license": "AGPL-3",
"category": "Generic Modules",
"depends": ["attachment_queue", "storage_backend"],
"data": [
"views/attachment_view.xml",
"views/task_view.xml",
"views/storage_backend_view.xml",
"data/cron.xml",
"security/ir.model.access.csv",
], ],
'data': [
'views/attachment_view.xml',
'views/task_view.xml',
'views/storage_backend_view.xml',
'data/cron.xml',
'security/ir.model.access.csv',
],
'demo': [
"demo/attachment_synchronize_task_demo.xml",
],
'installable': True,
'application': False,
"demo": ["demo/attachment_synchronize_task_demo.xml"],
"installable": True,
"application": False,
} }

21
attachment_synchronize/models/attachment.py

@ -1,26 +1,27 @@
# @ 2016 Florian DA COSTA @ Akretion # @ 2016 Florian DA COSTA @ Akretion
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo import models, fields, api
import os import os
from odoo import models, fields
class AttachmentQueue(models.Model): class AttachmentQueue(models.Model):
_inherit = 'attachment.queue'
_inherit = "attachment.queue"
task_id = fields.Many2one('attachment.synchronize.task', string='Task')
task_id = fields.Many2one("attachment.synchronize.task", string="Task")
storage_backend_id = fields.Many2one( storage_backend_id = fields.Many2one(
'storage.backend', string='Storage Backend',
related='task_id.backend_id', store=True)
"storage.backend",
string="Storage Backend",
related="task_id.backend_id",
store=True,
)
file_type = fields.Selection( file_type = fields.Selection(
selection_add=[
('export',
'Export File (External location)')
])
selection_add=[("export", "Export File (External location)")]
)
def _run(self): def _run(self):
super()._run() super()._run()
if self.file_type == 'export':
if self.file_type == "export":
path = os.path.join(self.task_id.filepath, self.datas_fname) path = os.path.join(self.task_id.filepath, self.datas_fname)
self.storage_backend_id._add_b64_data(path, self.datas) self.storage_backend_id._add_b64_data(path, self.datas)

4
attachment_synchronize/models/storage_backend.py

@ -7,5 +7,5 @@ class StorageBackend(models.Model):
_inherit = "storage.backend" _inherit = "storage.backend"
synchronize_task_ids = fields.One2many( synchronize_task_ids = fields.One2many(
"attachment.synchronize.task", "backend_id",
string="Tasks")
"attachment.synchronize.task", "backend_id", string="Tasks"
)

170
attachment_synchronize/models/task.py

@ -20,82 +20,96 @@ try:
# This is done on purpose: it prevents incidental or malicious execution of # This is done on purpose: it prevents incidental or malicious execution of
# Python code that may break the security of the server. # Python code that may break the security of the server.
from jinja2.sandbox import SandboxedEnvironment from jinja2.sandbox import SandboxedEnvironment
mako_template_env = SandboxedEnvironment( mako_template_env = SandboxedEnvironment(
variable_start_string="${", variable_start_string="${",
variable_end_string="}", variable_end_string="}",
line_statement_prefix="%", line_statement_prefix="%",
trim_blocks=True, # do not output newline after blocks trim_blocks=True, # do not output newline after blocks
) )
mako_template_env.globals.update({
'str': str,
'datetime': datetime,
'len': len,
'abs': abs,
'min': min,
'max': max,
'sum': sum,
'filter': filter,
'map': map,
'round': round,
})
mako_template_env.globals.update(
{
"str": str,
"datetime": datetime,
"len": len,
"abs": abs,
"min": min,
"max": max,
"sum": sum,
"filter": filter,
"map": map,
"round": round,
}
)
except ImportError: except ImportError:
_logger.warning("jinja2 not available, templating features will not work!") _logger.warning("jinja2 not available, templating features will not work!")
class AttachmentSynchronizeTask(models.Model): class AttachmentSynchronizeTask(models.Model):
_name = 'attachment.synchronize.task'
_description = 'Attachment synchronize task'
_name = "attachment.synchronize.task"
_description = "Attachment synchronize task"
name = fields.Char(required=True) name = fields.Char(required=True)
method_type = fields.Selection( method_type = fields.Selection(
[('import', 'Import'), ('export', 'Export')],
required=True)
pattern = fields.Char(help='File name which is imported.'
'The system will check if the remote file at '
'least contains the pattern in its name. '
'Leave it empty to import all files')
filepath = fields.Char(help='Path to imported/exported file')
[("import", "Import"), ("export", "Export")], required=True
)
pattern = fields.Char(
help="File name which is imported."
"The system will check if the remote file at "
"least contains the pattern in its name. "
"Leave it empty to import all files"
)
filepath = fields.Char(help="Path to imported/exported file")
backend_id = fields.Many2one( backend_id = fields.Many2one(
'storage.backend', string='Backend', required=True)
attachment_ids = fields.One2many('attachment.queue', 'task_id',
string='Attachment')
move_path = fields.Char(string='Move Path',
help='Imported File will be moved to this path')
new_name = fields.Char(string='New Name',
help='Imported File will be renamed to this name\n'
'Name can use mako template where obj is an '
'ir_attachement. template exemple : '
' ${obj.name}-${obj.create_date}.csv')
"storage.backend", string="Backend", required=True
)
attachment_ids = fields.One2many(
"attachment.queue", "task_id", string="Attachment"
)
move_path = fields.Char(
string="Move Path", help="Imported File will be moved to this path"
)
new_name = fields.Char(
string="New Name",
help="Imported File will be renamed to this name\n"
"Name can use mako template where obj is an "
"ir_attachement. template exemple : "
" ${obj.name}-${obj.create_date}.csv",
)
after_import = fields.Selection( after_import = fields.Selection(
selection=[ selection=[
('rename', 'Rename'),
('move', 'Move'),
('move_rename', 'Move & Rename'),
('delete', 'Delete'),
], help='Action after import a file')
("rename", "Rename"),
("move", "Move"),
("move_rename", "Move & Rename"),
("delete", "Delete"),
],
help="Action after import a file",
)
file_type = fields.Selection( file_type = fields.Selection(
selection=[], selection=[],
string="File Type", string="File Type",
help="The file type determines an import method to be used " help="The file type determines an import method to be used "
"to parse and transform data before their import in ERP")
enabled = fields.Boolean('Enabled', default=True)
"to parse and transform data before their import in ERP",
)
enabled = fields.Boolean("Enabled", default=True)
check_duplicated_files = fields.Boolean( check_duplicated_files = fields.Boolean(
string='Check duplicated files',
help='If checked, will avoid duplication file import')
string="Check duplicated files",
help="If checked, will avoid duplication file import",
)
emails = fields.Char( emails = fields.Char(
string="Emails", string="Emails",
help="list of email which should be notified in case of failure " help="list of email which should be notified in case of failure "
"when excuting the files linked to this task")
"when excuting the files linked to this task",
)
@api.multi
def _prepare_attachment_vals(self, datas, filename): def _prepare_attachment_vals(self, datas, filename):
self.ensure_one() self.ensure_one()
vals = { vals = {
'name': filename,
'datas': datas,
'datas_fname': filename,
'task_id': self.id,
'file_type': self.file_type or False,
"name": filename,
"datas": datas,
"datas_fname": filename,
"task_id": self.id,
"file_type": self.file_type or False,
} }
return vals return vals
@ -106,13 +120,14 @@ class AttachmentSynchronizeTask(models.Model):
except Exception: except Exception:
_logger.exception("Failed to load template %r", template) _logger.exception("Failed to load template %r", template)
variables = {'obj': record}
variables = {"obj": record}
try: try:
render_result = template.render(variables) render_result = template.render(variables)
except Exception: except Exception:
_logger.exception( _logger.exception(
"Failed to render template %r using values %r" %
(template, variables))
"Failed to render template %r using values %r"
% (template, variables)
)
render_result = u"" render_result = u""
if render_result == u"False": if render_result == u"False":
render_result = u"" render_result = u""
@ -122,17 +137,15 @@ class AttachmentSynchronizeTask(models.Model):
def run_task_import_scheduler(self, domain=None): def run_task_import_scheduler(self, domain=None):
if domain is None: if domain is None:
domain = [] domain = []
domain = expression.AND([domain, [
('method_type', '=', 'import'),
('enabled', '=', True),
]])
domain = expression.AND(
[domain, [("method_type", "=", "import"), ("enabled", "=", True)]]
)
for task in self.search(domain): for task in self.search(domain):
task.run_import() task.run_import()
@api.multi
def run_import(self): def run_import(self):
self.ensure_one() self.ensure_one()
attach_obj = self.env['attachment.queue']
attach_obj = self.env["attachment.queue"]
backend = self.backend_id backend = self.backend_id
filepath = self.filepath or "" filepath = self.filepath or ""
filenames = backend._list(relative_path=filepath, pattern=self.pattern) filenames = backend._list(relative_path=filepath, pattern=self.pattern)
@ -141,34 +154,43 @@ class AttachmentSynchronizeTask(models.Model):
total_import = 0 total_import = 0
for file_name in filenames: for file_name in filenames:
with api.Environment.manage(): with api.Environment.manage():
with odoo.registry(
self.env.cr.dbname).cursor() as new_cr:
new_env = api.Environment(new_cr, self.env.uid,
self.env.context)
with odoo.registry(self.env.cr.dbname).cursor() as new_cr:
new_env = api.Environment(
new_cr, self.env.uid, self.env.context
)
try: try:
full_absolute_path = os.path.join(filepath, file_name) full_absolute_path = os.path.join(filepath, file_name)
datas = backend._get_b64_data(full_absolute_path) datas = backend._get_b64_data(full_absolute_path)
attach_vals = self._prepare_attachment_vals( attach_vals = self._prepare_attachment_vals(
datas, file_name)
datas, file_name
)
attachment = attach_obj.with_env(new_env).create( attachment = attach_obj.with_env(new_env).create(
attach_vals)
attach_vals
)
new_full_path = False new_full_path = False
if self.after_import == 'rename':
if self.after_import == "rename":
new_name = self._template_render( new_name = self._template_render(
self.new_name, attachment)
self.new_name, attachment
)
new_full_path = os.path.join(filepath, new_name) new_full_path = os.path.join(filepath, new_name)
elif self.after_import == 'move':
elif self.after_import == "move":
new_full_path = os.path.join( new_full_path = os.path.join(
self.move_path, file_name)
elif self.after_import == 'move_rename':
self.move_path, file_name
)
elif self.after_import == "move_rename":
new_name = self._template_render( new_name = self._template_render(
self.new_name, attachment)
self.new_name, attachment
)
new_full_path = os.path.join( new_full_path = os.path.join(
self.move_path, new_name)
self.move_path, new_name
)
if new_full_path: if new_full_path:
backend._add_b64_data(new_full_path, datas) backend._add_b64_data(new_full_path, datas)
if self.after_import in ( if self.after_import in (
'delete', 'rename', 'move', 'move_rename'
"delete",
"rename",
"move",
"move_rename",
): ):
backend._delete(full_absolute_path) backend._delete(full_absolute_path)
total_import += 1 total_import += 1
@ -177,8 +199,12 @@ class AttachmentSynchronizeTask(models.Model):
raise e raise e
else: else:
new_env.cr.commit() new_env.cr.commit()
_logger.info('Run import complete! Imported {0} files'.format(total_import))
_logger.info(
"Run import complete! Imported {0} files".format(total_import)
)
def _file_to_import(self, filenames): def _file_to_import(self, filenames):
imported = self.attachment_ids.filtered(lambda r: r.name in filenames).mapped('name')
imported = self.attachment_ids.filtered(
lambda r: r.name in filenames
).mapped("name")
return list(set(filenames) - set(imported)) return list(set(filenames) - set(imported))

13
attachment_synchronize/tests/common.py

@ -8,10 +8,12 @@ from odoo.addons.storage_backend.tests.common import Common
class SyncCommon(Common): class SyncCommon(Common):
def _clean_testing_directory(self): def _clean_testing_directory(self):
for test_dir in [ for test_dir in [
self.directory_input, self.directory_output, self.directory_archived]:
self.directory_input,
self.directory_output,
self.directory_archived,
]:
for filename in self.backend._list(test_dir): for filename in self.backend._list(test_dir):
self.backend._delete(os.path.join(test_dir, filename)) self.backend._delete(os.path.join(test_dir, filename))
@ -19,7 +21,8 @@ class SyncCommon(Common):
self.backend._add_b64_data( self.backend._add_b64_data(
os.path.join(self.directory_input, "bar.txt"), os.path.join(self.directory_input, "bar.txt"),
self.filedata, self.filedata,
mimetype=u"text/plain")
mimetype=u"text/plain",
)
def setUp(self): def setUp(self):
super().setUp() super().setUp()
@ -30,7 +33,9 @@ class SyncCommon(Common):
self.directory_archived = "test_archived" self.directory_archived = "test_archived"
self._clean_testing_directory() self._clean_testing_directory()
self._create_test_file() self._create_test_file()
self.task = self.env.ref("attachment_synchronize.import_from_filestore")
self.task = self.env.ref(
"attachment_synchronize.import_from_filestore"
)
def tearDown(self): def tearDown(self):
self.registry.leave_test_mode() self.registry.leave_test_mode()

15
attachment_synchronize/tests/test_export.py

@ -2,26 +2,27 @@
# @author Sébastien BEAU <sebastien.beau@akretion.com> # @author Sébastien BEAU <sebastien.beau@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import os
import mock import mock
from .common import SyncCommon from .common import SyncCommon
def raising_side_effect(*args, **kwargs): def raising_side_effect(*args, **kwargs):
raise Exception("Boom") raise Exception("Boom")
class TestExport(SyncCommon): class TestExport(SyncCommon):
def setUp(self): def setUp(self):
super().setUp() super().setUp()
self.task = self.env.ref("attachment_synchronize.export_to_filestore") self.task = self.env.ref("attachment_synchronize.export_to_filestore")
self.attachment = self.env["attachment.queue"].create({
self.attachment = self.env["attachment.queue"].create(
{
"name": "foo.txt", "name": "foo.txt",
"datas_fname": "foo.txt", "datas_fname": "foo.txt",
"task_id": self.task.id, "task_id": self.task.id,
"file_type": "export", "file_type": "export",
"datas": self.filedata, "datas": self.filedata,
})
}
)
def test_export(self): def test_export(self):
self.attachment.run() self.attachment.run()
@ -29,7 +30,11 @@ class TestExport(SyncCommon):
self.assertEqual(result, ["foo.txt"]) self.assertEqual(result, ["foo.txt"])
def test_failing_export(self): def test_failing_export(self):
with mock.patch.object(type(self.backend), "_add_b64_data", side_effect=raising_side_effect):
with mock.patch.object(
type(self.backend),
"_add_b64_data",
side_effect=raising_side_effect,
):
self.attachment.run() self.attachment.run()
self.assertEqual(self.attachment.state, "failed") self.assertEqual(self.attachment.state, "failed")
self.assertEqual(self.attachment.state_message, "Boom") self.assertEqual(self.attachment.state_message, "Boom")

19
attachment_synchronize/tests/test_import.py

@ -6,7 +6,6 @@ from .common import SyncCommon
class TestImport(SyncCommon): class TestImport(SyncCommon):
@property @property
def archived_files(self): def archived_files(self):
return self.backend._list(self.directory_archived) return self.backend._list(self.directory_archived)
@ -16,7 +15,9 @@ class TestImport(SyncCommon):
return self.backend._list(self.directory_input) return self.backend._list(self.directory_input)
def _check_attachment_created(self, count=1): def _check_attachment_created(self, count=1):
attachment = self.env["attachment.queue"].search([("name", "=", "bar.txt")])
attachment = self.env["attachment.queue"].search(
[("name", "=", "bar.txt")]
)
self.assertEqual(len(attachment), count) self.assertEqual(len(attachment), count)
def test_import_with_rename(self): def test_import_with_rename(self):
@ -27,18 +28,22 @@ class TestImport(SyncCommon):
self.assertEqual(self.archived_files, []) self.assertEqual(self.archived_files, [])
def test_import_with_move(self): def test_import_with_move(self):
self.task.write({"after_import": "move", "move_path": self.directory_archived})
self.task.write(
{"after_import": "move", "move_path": self.directory_archived}
)
self.task.run_import() self.task.run_import()
self._check_attachment_created() self._check_attachment_created()
self.assertEqual(self.input_files, []) self.assertEqual(self.input_files, [])
self.assertEqual(self.archived_files, ["bar.txt"]) self.assertEqual(self.archived_files, ["bar.txt"])
def test_import_with_move_and_rename(self): def test_import_with_move_and_rename(self):
self.task.write({
self.task.write(
{
"after_import": "move_rename", "after_import": "move_rename",
"new_name": "foo.txt", "new_name": "foo.txt",
"move_path": self.directory_archived, "move_path": self.directory_archived,
})
}
)
self.task.run_import() self.task.run_import()
self._check_attachment_created() self._check_attachment_created()
self.assertEqual(self.input_files, []) self.assertEqual(self.input_files, [])
@ -61,7 +66,9 @@ class TestImport(SyncCommon):
self._check_attachment_created(count=2) self._check_attachment_created(count=2)
def test_import_twice_no_duplicate(self): def test_import_twice_no_duplicate(self):
self.task.write({"after_import": "delete", "check_duplicated_files": True})
self.task.write(
{"after_import": "delete", "check_duplicated_files": True}
)
self.task.run_import() self.task.run_import()
self._check_attachment_created(count=1) self._check_attachment_created(count=1)

Loading…
Cancel
Save