Sébastien BEAU
5 years ago
committed by
David Beal
13 changed files with 198 additions and 23 deletions
-
1attachment_synchronize/__manifest__.py
-
6attachment_synchronize/data/cron.xml
-
20attachment_synchronize/demo/attachment_synchronize_task_demo.xml
-
2attachment_synchronize/models/attachment.py
-
4attachment_synchronize/models/storage_backend.py
-
23attachment_synchronize/models/task.py
-
2attachment_synchronize/security/ir.model.access.csv
-
2attachment_synchronize/tests/__init__.py
-
38attachment_synchronize/tests/common.py
-
35attachment_synchronize/tests/test_export.py
-
80attachment_synchronize/tests/test_import.py
-
2attachment_synchronize/views/storage_backend_view.xml
-
4attachment_synchronize/views/task_view.xml
@ -1,16 +1,16 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<odoo noupdate="1"> |
|||
|
|||
<record model="ir.cron" id="cronjob_run_exchange_tasks"> |
|||
<record model="ir.cron" id="cronjob_run_attachment_synchronize_task_import"> |
|||
<field name='name'>Run attachment tasks import</field> |
|||
<field name='interval_number'>30</field> |
|||
<field name='interval_type'>minutes</field> |
|||
<field name="numbercall">-1</field> |
|||
<field name="active">False</field> |
|||
<field name="doall" eval="False" /> |
|||
<field name="model_id" ref="model_storage_backend_task"/> |
|||
<field name="model_id" ref="model_attachment_synchronize_task"/> |
|||
<field name="state">code</field> |
|||
<field name="code">model.run_task_scheduler([('method_type', '=', 'import')])</field> |
|||
<field name="code">model.run_task_import_scheduler()</field> |
|||
</record> |
|||
|
|||
</odoo> |
@ -0,0 +1,20 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<odoo> |
|||
<record id="import_from_filestore" model="attachment.synchronize.task"> |
|||
<field name="name">TEST Import</field> |
|||
<field name="backend_id" ref="storage_backend.default_storage_backend"/> |
|||
<field name="method_type">import</field> |
|||
<field name="after_import">delete</field> |
|||
<field name="filepath">test_import</field> |
|||
<field name="emails">foo@example.org,bar@example.org</field> |
|||
</record> |
|||
|
|||
<record id="export_to_filestore" model="attachment.synchronize.task"> |
|||
<field name="name">TEST Export</field> |
|||
<field name="backend_id" ref="storage_backend.default_storage_backend"/> |
|||
<field name="method_type">export</field> |
|||
<field name="filepath">test_export</field> |
|||
<field name="emails">foo@example.org,bar@example.org</field> |
|||
</record> |
|||
|
|||
</odoo> |
@ -1,2 +1,2 @@ |
|||
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink |
|||
access_storage_backend_task_manager,storage.backend.task.manager,model_storage_backend_task,base.group_system,1,1,1,1 |
|||
access_attachment_synchronize_task_manager,attachment.synchronize.task.manager,model_attachment_synchronize_task,base.group_system,1,1,1,1 |
@ -0,0 +1,2 @@ |
|||
from . import test_import |
|||
from . import test_export |
@ -0,0 +1,38 @@ |
|||
# Copyright 2020 Akretion (http://www.akretion.com). |
|||
# @author Sébastien BEAU <sebastien.beau@akretion.com> |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). |
|||
|
|||
import mock |
|||
import os |
|||
from odoo.addons.storage_backend.tests.common import Common |
|||
|
|||
|
|||
class SyncCommon(Common): |
|||
|
|||
def _clean_testing_directory(self): |
|||
for test_dir in [ |
|||
self.directory_input, self.directory_output, self.directory_archived]: |
|||
for filename in self.backend._list(test_dir): |
|||
self.backend._delete(os.path.join(test_dir, filename)) |
|||
|
|||
def _create_test_file(self): |
|||
self.backend._add_b64_data( |
|||
os.path.join(self.directory_input, "bar.txt"), |
|||
self.filedata, |
|||
mimetype=u"text/plain") |
|||
|
|||
def setUp(self): |
|||
super().setUp() |
|||
self.env.cr.commit = mock.Mock() |
|||
self.registry.enter_test_mode(self.env.cr) |
|||
self.directory_input = "test_import" |
|||
self.directory_output = "test_output" |
|||
self.directory_archived = "test_archived" |
|||
self._clean_testing_directory() |
|||
self._create_test_file() |
|||
self.task = self.env.ref("attachment_synchronize.import_from_filestore") |
|||
|
|||
def tearDown(self): |
|||
self.registry.leave_test_mode() |
|||
self._clean_testing_directory() |
|||
super().tearDown() |
@ -0,0 +1,35 @@ |
|||
# Copyright 2020 Akretion (http://www.akretion.com). |
|||
# @author Sébastien BEAU <sebastien.beau@akretion.com> |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). |
|||
|
|||
import os |
|||
import mock |
|||
from .common import SyncCommon |
|||
|
|||
def raising_side_effect(*args, **kwargs): |
|||
raise Exception("Boom") |
|||
|
|||
|
|||
class TestExport(SyncCommon): |
|||
|
|||
def setUp(self): |
|||
super().setUp() |
|||
self.task = self.env.ref("attachment_synchronize.export_to_filestore") |
|||
self.attachment = self.env["attachment.queue"].create({ |
|||
"name": "foo.txt", |
|||
"datas_fname": "foo.txt", |
|||
"task_id": self.task.id, |
|||
"file_type": "export", |
|||
"datas": self.filedata, |
|||
}) |
|||
|
|||
def test_export(self): |
|||
self.attachment.run() |
|||
result = self.backend._list("test_export") |
|||
self.assertEqual(result, ["foo.txt"]) |
|||
|
|||
def test_failing_export(self): |
|||
with mock.patch.object(type(self.backend), "_add_b64_data", side_effect=raising_side_effect): |
|||
self.attachment.run() |
|||
self.assertEqual(self.attachment.state, "failed") |
|||
self.assertEqual(self.attachment.state_message, "Boom") |
@ -0,0 +1,80 @@ |
|||
# Copyright 2020 Akretion (http://www.akretion.com). |
|||
# @author Sébastien BEAU <sebastien.beau@akretion.com> |
|||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). |
|||
|
|||
from .common import SyncCommon |
|||
|
|||
|
|||
class TestImport(SyncCommon): |
|||
|
|||
@property |
|||
def archived_files(self): |
|||
return self.backend._list(self.directory_archived) |
|||
|
|||
@property |
|||
def input_files(self): |
|||
return self.backend._list(self.directory_input) |
|||
|
|||
def _check_attachment_created(self, count=1): |
|||
attachment = self.env["attachment.queue"].search([("name", "=", "bar.txt")]) |
|||
self.assertEqual(len(attachment), count) |
|||
|
|||
def test_import_with_rename(self): |
|||
self.task.write({"after_import": "rename", "new_name": "foo.txt"}) |
|||
self.task.run_import() |
|||
self._check_attachment_created() |
|||
self.assertEqual(self.input_files, ["foo.txt"]) |
|||
self.assertEqual(self.archived_files, []) |
|||
|
|||
def test_import_with_move(self): |
|||
self.task.write({"after_import": "move", "move_path": self.directory_archived}) |
|||
self.task.run_import() |
|||
self._check_attachment_created() |
|||
self.assertEqual(self.input_files, []) |
|||
self.assertEqual(self.archived_files, ["bar.txt"]) |
|||
|
|||
def test_import_with_move_and_rename(self): |
|||
self.task.write({ |
|||
"after_import": "move_rename", |
|||
"new_name": "foo.txt", |
|||
"move_path": self.directory_archived, |
|||
}) |
|||
self.task.run_import() |
|||
self._check_attachment_created() |
|||
self.assertEqual(self.input_files, []) |
|||
self.assertEqual(self.archived_files, ["foo.txt"]) |
|||
|
|||
def test_import_with_delete(self): |
|||
self.task.write({"after_import": "delete"}) |
|||
self.task.run_import() |
|||
self._check_attachment_created() |
|||
self.assertEqual(self.input_files, []) |
|||
self.assertEqual(self.archived_files, []) |
|||
|
|||
def test_import_twice(self): |
|||
self.task.write({"after_import": "delete"}) |
|||
self.task.run_import() |
|||
self._check_attachment_created(count=1) |
|||
|
|||
self._create_test_file() |
|||
self.task.run_import() |
|||
self._check_attachment_created(count=2) |
|||
|
|||
def test_import_twice_no_duplicate(self): |
|||
self.task.write({"after_import": "delete", "check_duplicated_files": True}) |
|||
self.task.run_import() |
|||
self._check_attachment_created(count=1) |
|||
|
|||
self._create_test_file() |
|||
self.task.run_import() |
|||
self._check_attachment_created(count=1) |
|||
|
|||
def test_running_cron(self): |
|||
self.task.write({"after_import": "delete"}) |
|||
self.env["attachment.synchronize.task"].run_task_import_scheduler() |
|||
self._check_attachment_created(count=1) |
|||
|
|||
def test_running_cron_disable_task(self): |
|||
self.task.write({"after_import": "delete", "enabled": False}) |
|||
self.env["attachment.synchronize.task"].run_task_import_scheduler() |
|||
self._check_attachment_created(count=0) |
Write
Preview
Loading…
Cancel
Save
Reference in new issue