Browse Source

[REF] start cleanning

12.0-mig-module_prototyper_last
Sébastien BEAU 4 years ago
committed by David Beal
parent
commit
c8d512dd20
  1. 3
      attachment_synchronize/__manifest__.py
  2. 95
      attachment_synchronize/demo/task_demo.xml
  3. 4
      attachment_synchronize/models/attachment.py
  4. 20
      attachment_synchronize/models/task.py
  5. 4
      attachment_synchronize/tests/__init__.py
  6. 32
      attachment_synchronize/tests/common.py
  7. 74
      attachment_synchronize/tests/mock_server.py
  8. 50
      attachment_synchronize/tests/test_filestore.py
  9. 86
      attachment_synchronize/tests/test_ftp.py
  10. 85
      attachment_synchronize/tests/test_sftp.py
  11. 12
      attachment_synchronize/views/attachment_view.xml

3
attachment_synchronize/__manifest__.py

@ -11,7 +11,7 @@
'license': 'AGPL-3',
'category': 'Generic Modules',
'depends': [
'base_attachment_queue',
'attachment_queue',
'storage_backend',
],
'data': [
@ -22,7 +22,6 @@
'security/ir.model.access.csv',
],
'demo': [
# 'demo/task_demo.xml',
],
'installable': True,
'application': False,

95
attachment_synchronize/demo/task_demo.xml

@ -1,95 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo noupdate="1">
<record id="location_ftp" model="external.file.location">
<field name="name">TEST FTP</field>
<field name="protocol">ftp</field>
<field name="address">my-ftp-address</field>
<field name="login">my-ftp-user</field>
<field name="password">my-ftp-password</field>
<field name="port">21</field>
</record>
<record id="location_sftp" model="external.file.location">
<field name="name">TEST SFTP</field>
<field name="protocol">sftp</field>
<field name="address">my-sftp-address</field>
<field name="login">my-sftp-user</field>
<field name="password">my-sftp-password</field>
<field name="port">22</field>
</record>
<record id="location_filestore" model="external.file.location">
<field name="name">TEST File Store</field>
<field name="protocol">file_store</field>
<field name="filestore_rootpath">/</field>
</record>
<record id="ftp_import_task" model="external.file.task">
<field name="method_type">import</field>
<field name="location_id" eval="ref('external_file_location.location_ftp')"/>
<field name="filename">test-import-ftp.txt</field>
<field name="filepath">/home/user/test</field>
<field name="name">Import FTP Task</field>
</record>
<record id="ftp_export_task" model="external.file.task">
<field name="method_type">export</field>
<field name="location_id" eval="ref('external_file_location.location_ftp')"/>
<field name="filepath">/home/user/test</field>
<field name="name">Export FTP Task</field>
</record>
<record id="sftp_import_task" model="external.file.task">
<field name="method_type">import</field>
<field name="location_id" eval="ref('external_file_location.location_sftp')"/>
<field name="filename">test-import-sftp.txt</field>
<field name="filepath">/home/user/test</field>
<field name="name">Import SFTP Task</field>
</record>
<record id="sftp_export_task" model="external.file.task">
<field name="method_type">export</field>
<field name="location_id" eval="ref('external_file_location.location_sftp')"/>
<field name="filepath">/home/user/test</field>
<field name="name">Export SFTP Task</field>
</record>
<record id="filestore_import_task" model="external.file.task">
<field name="method_type">import</field>
<field name="location_id" eval="ref('external_file_location.location_filestore')"/>
<field name="filename">test-import-filestore.txt</field>
<field name="filepath">/home/user/test</field>
<field name="name">Import filestore Task</field>
</record>
<record id="filestore_export_task" model="external.file.task">
<field name="method_type">export</field>
<field name="location_id" eval="ref('external_file_location.location_filestore')"/>
<field name="filepath">/home/user/test</field>
<field name="name">Export filestore Task</field>
</record>
<record id="ir_attachment_export_file_sftp" model="ir.attachment.metadata">
<field name="name">Sftp text export file</field>
<field name="datas">dGVzdCBzZnRwIGZpbGUgZXhwb3J0</field>
<field name="datas_fname">sftp_test_export.txt</field>
<field name="task_id" eval="ref('external_file_location.sftp_export_task')"/>
<field name="file_type">export_external_location</field>
</record>
<record id="ir_attachment_export_file_ftp" model="ir.attachment.metadata">
<field name="name">ftp text export file</field>
<field name="datas">dGVzdCBmdHAgZmlsZSBleHBvcnQ=</field>
<field name="datas_fname">ftp_test_export.txt</field>
<field name="task_id" eval="ref('external_file_location.ftp_export_task')"/>
<field name="file_type">export_external_location</field>
</record>
<record id="ir_attachment_export_file_filestore" model="ir.attachment.metadata">
<field name="name">filestore text export file</field>
<field name="datas">dGVzdCBmaWxlc3RvcmUgZmlsZSBleHBvcnQ=</field>
<field name="datas_fname">filestore_test_export.txt</field>
<field name="task_id" eval="ref('external_file_location.filestore_export_task')"/>
<field name="file_type">export_external_location</field>
</record>
</odoo>

4
attachment_synchronize/models/attachment.py

@ -5,8 +5,8 @@ from odoo import models, fields, api
import os
class IrAttachmentMetadata(models.Model):
_inherit = 'ir.attachment.metadata'
class AttachmentQueue(models.Model):
_inherit = 'attachment.queue'
task_id = fields.Many2one('storage.backend.task', string='Task')
storage_backend_id = fields.Many2one(

20
attachment_synchronize/models/task.py

@ -41,7 +41,7 @@ except ImportError:
_logger.warning("jinja2 not available, templating features will not work!")
class StorageTask(models.Model):
class StorageBackendTask(models.Model):
_name = 'storage.backend.task'
_description = 'Storage Backend task'
@ -56,7 +56,7 @@ class StorageTask(models.Model):
filepath = fields.Char(help='Path to imported/exported file')
backend_id = fields.Many2one(
'storage.backend', string='Backend', required=True)
attachment_ids = fields.One2many('ir.attachment.metadata', 'task_id',
attachment_ids = fields.One2many('attachment.queue', 'task_id',
string='Attachment')
move_path = fields.Char(string='Move Path',
help='Imported File will be moved to this path')
@ -118,18 +118,20 @@ class StorageTask(models.Model):
return render_result
@api.model
def run_task_scheduler(self, domain=list()):
if ('method_type', '=', 'import') not in domain:
domain.append([('method_type', '=', 'import')])
domain.append([('enabled', '=', True)])
tasks = self.env['storage.backend'].search(domain)
for task in tasks:
def run_task_scheduler(self, domain=None):
if domain is None:
domain = []
domain = expression.AND(domain, [
('method_type', '=', 'import'),
('enabled', '=', True),
])
for task in self.search(domain):
task.run_import()
@api.multi
def run_import(self):
self.ensure_one()
attach_obj = self.env['ir.attachment.metadata']
attach_obj = self.env['attachment.queue']
backend = self.backend_id
filenames = backend._list(
relative_path=self.filepath, pattern=self.pattern)

4
attachment_synchronize/tests/__init__.py

@ -1,4 +0,0 @@
from . import mock_server
from . import test_ftp
from . import test_sftp
from . import test_filestore

32
attachment_synchronize/tests/common.py

@ -1,32 +0,0 @@
# coding: utf-8
# @ 2016 Florian da Costa @ Akretion
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import openerp.tests.common as common
from openerp import api
from StringIO import StringIO
class ContextualStringIO(StringIO):
"""
snippet from http://bit.ly/1HfH6uW (stackoverflow)
"""
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
class TestConnection(common.TransactionCase):
def setUp(self):
super(TestConnection, self).setUp()
self.registry.enter_test_mode()
self.env = api.Environment(self.registry.test_cr, self.env.uid,
self.env.context)
def tearDown(self):
self.registry.leave_test_mode()
super(TestConnection, self).tearDown()

74
attachment_synchronize/tests/mock_server.py

@ -1,74 +0,0 @@
# coding: utf-8
# Copyright (C) 2014 initOS GmbH & Co. KG (<http://www.initos.com>).
# @ 2015 Valentin CHEMIERE @ Akretion
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import mock
from contextlib import contextmanager
from collections import defaultdict
class MultiResponse(dict):
pass
class ConnMock(object):
def __init__(self, response):
self.response = response
self._calls = []
self.call_count = defaultdict(int)
def __getattribute__(self, method):
if method not in ('_calls', 'response', 'call_count'):
def callable(*args, **kwargs):
self._calls.append({
'method': method,
'args': args,
'kwargs': kwargs,
})
call = self.response[method]
if isinstance(call, MultiResponse):
call = call[self.call_count[method]]
self.call_count[method] += 1
return call
return callable
else:
return super(ConnMock, self).__getattribute__(method)
def __call__(self, *args, **kwargs):
return self
def __enter__(self, *args, **kwargs):
return self
def __exit__(self, *args, **kwargs):
pass
def __repr__(self, *args, **kwargs):
return self
def __getitem__(self, key):
return
@contextmanager
def server_mock_sftp(response):
with mock.patch('openerp.addons.external_file_location.tasks.sftp.'
'SftpTask', ConnMock(response)) as SFTPFS:
yield SFTPFS._calls
@contextmanager
def server_mock_ftp(response):
with mock.patch('openerp.addons.external_file_location.tasks.ftp.'
'FtpTask', ConnMock(response)) as FTPFS:
yield FTPFS._calls
@contextmanager
def server_mock_filestore(response):
with mock.patch('openerp.addons.external_file_location.tasks.filestore.'
'FileStoreTask', ConnMock(response)) as FTPFS:
yield FTPFS._calls

50
attachment_synchronize/tests/test_filestore.py

@ -1,50 +0,0 @@
# coding: utf-8
# @ 2015 Valentin CHEMIERE @ Akretion
# ©2016 @author Mourad EL HADJ MIMOUNE <mourad.elhadj.mimoune@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from base64 import b64decode
from .common import TestConnection, ContextualStringIO
from .mock_server import server_mock_filestore
_logger = logging.getLogger(__name__)
class TestfilestoreConnection(TestConnection):
def setUp(self):
super(TestfilestoreConnection, self).setUp()
self.test_file_filestore = ContextualStringIO()
self.test_file_filestore.write('import filestore')
self.test_file_filestore.seek(0)
def test_00_filestore_import(self):
self.task = self.env.ref(
'external_file_location.filestore_import_task')
with server_mock_filestore(
{'open': self.test_file_filestore,
'listdir': ['test-import-filestore.txt']}):
self.task.run_import()
search_file = self.env['ir.attachment.metadata'].search(
[('name', '=', 'test-import-filestore.txt')])
self.assertEqual(len(search_file), 1)
self.assertEqual(b64decode(search_file[0].datas), 'import filestore')
def test_01_filestore_export(self):
self.task = self.env.ref(
'external_file_location.filestore_export_task')
self.filestore_attachment = self.env.ref(
'external_file_location.ir_attachment_export_file_filestore')
with server_mock_filestore(
{'setcontents': ''}) as Fakefilestore:
self.task.run_export()
if Fakefilestore:
self.assertEqual('setcontents', Fakefilestore[-1]['method'])
self.assertEqual('done', self.filestore_attachment.state)
self.assertEqual(
'/home/user/test/filestore_test_export.txt',
Fakefilestore[-1]['args'][0])
self.assertEqual(
'test filestore file export',
Fakefilestore[-1]['kwargs']['data'])

86
attachment_synchronize/tests/test_ftp.py

@ -1,86 +0,0 @@
# coding: utf-8
# @ 2015 Valentin CHEMIERE @ Akretion
# ©2016 @author Mourad EL HADJ MIMOUNE <mourad.elhadj.mimoune@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from base64 import b64decode
import hashlib
from .common import TestConnection, ContextualStringIO
from .mock_server import server_mock_ftp
from .mock_server import MultiResponse
from openerp.exceptions import UserError
_logger = logging.getLogger(__name__)
class TestFtpConnection(TestConnection):
def setUp(self):
super(TestFtpConnection, self).setUp()
self.test_file_ftp = ContextualStringIO()
self.test_file_ftp.write('import ftp')
self.test_file_ftp.seek(0)
def test_00_ftp_import(self):
self.task = self.env.ref('external_file_location.ftp_import_task')
with server_mock_ftp(
{'open': self.test_file_ftp,
'listdir': ['test-import-ftp.txt']}):
self.task.run_import()
search_file = self.env['ir.attachment.metadata'].search(
[('name', '=', 'test-import-ftp.txt')])
self.assertEqual(len(search_file), 1)
self.assertEqual(b64decode(search_file[0].datas), 'import ftp')
def test_01_ftp_export(self):
self.task = self.env.ref('external_file_location.ftp_export_task')
self.ftp_attachment = self.env.ref(
'external_file_location.ir_attachment_export_file_ftp')
with server_mock_ftp(
{'setcontents': ''}) as FakeFTP:
self.task.run_export()
if FakeFTP:
self.assertEqual('setcontents', FakeFTP[-1]['method'])
self.assertEqual('done', self.ftp_attachment.state)
self.assertEqual(
'/home/user/test/ftp_test_export.txt',
FakeFTP[-1]['args'][0])
self.assertEqual(
'test ftp file export',
FakeFTP[-1]['kwargs']['data'])
def test_02_ftp_import_md5(self):
md5_file = ContextualStringIO()
md5_file.write(hashlib.md5('import ftp').hexdigest())
md5_file.seek(0)
task = self.env.ref('external_file_location.ftp_import_task')
task.md5_check = True
with server_mock_ftp(
{'open': MultiResponse({
1: md5_file,
0: self.test_file_ftp}),
'listdir': [task.filename]}) as Fakeftp:
task.run_import()
search_file = self.env['ir.attachment.metadata'].search(
(('name', '=', task.filename),))
self.assertEqual(len(search_file), 1)
self.assertEqual(b64decode(search_file[0].datas),
'import ftp')
self.assertEqual('open', Fakeftp[-1]['method'])
self.assertEqual(hashlib.md5('import ftp').hexdigest(),
search_file.external_hash)
def test_03_ftp_import_md5_corrupt_file(self):
md5_file = ContextualStringIO()
md5_file.write(hashlib.md5('import test ftp corrupted').hexdigest())
md5_file.seek(0)
task = self.env.ref('external_file_location.ftp_import_task')
task.md5_check = True
with server_mock_ftp(
{'open': MultiResponse({
1: md5_file,
0: self.test_file_ftp}),
'listdir': [task.filename]}):
with self.assertRaises(UserError):
task.run_import()

85
attachment_synchronize/tests/test_sftp.py

@ -1,85 +0,0 @@
# coding: utf-8
# @ 2015 Valentin CHEMIERE @ Akretion
# ©2016 @author Mourad EL HADJ MIMOUNE <mourad.elhadj.mimoune@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from base64 import b64decode
import hashlib
from .common import TestConnection, ContextualStringIO
from .mock_server import server_mock_sftp
from .mock_server import MultiResponse
from openerp.exceptions import UserError
_logger = logging.getLogger(__name__)
class TestSftpConnection(TestConnection):
def setUp(self):
super(TestSftpConnection, self).setUp()
self.test_file_sftp = ContextualStringIO()
self.test_file_sftp.write('import sftp')
self.test_file_sftp.seek(0)
def test_00_sftp_import(self):
task = self.env.ref('external_file_location.sftp_import_task')
with server_mock_sftp(
{'open': self.test_file_sftp,
'listdir': [task.filename]}):
task.run_import()
search_file = self.env['ir.attachment.metadata'].search(
[('name', '=', task.filename)])
self.assertEqual(len(search_file), 1)
self.assertEqual(b64decode(search_file[0].datas), 'import sftp')
def test_01_sftp_export(self):
self.task = self.env.ref('external_file_location.sftp_export_task')
self.sftp_attachment = self.env.ref(
'external_file_location.ir_attachment_export_file_sftp')
with server_mock_sftp(
{'setcontents': ''}) as FakeSFTP:
self.task.run_export()
if FakeSFTP:
self.assertEqual('setcontents', FakeSFTP[-1]['method'])
self.assertEqual(
'/home/user/test/sftp_test_export.txt',
FakeSFTP[-1]['args'][0])
self.assertEqual(
'test sftp file export',
FakeSFTP[-1]['kwargs']['data'])
def test_02_sftp_import_md5(self):
md5_file = ContextualStringIO()
md5_file.write(hashlib.md5('import sftp').hexdigest())
md5_file.seek(0)
task = self.env.ref('external_file_location.sftp_import_task')
task.md5_check = True
with server_mock_sftp(
{'open': MultiResponse({
1: md5_file,
0: self.test_file_sftp}),
'listdir': [task.filename]}) as FakeSFTP:
task.run_import()
search_file = self.env['ir.attachment.metadata'].search(
(('name', '=', task.filename),))
self.assertEqual(len(search_file), 1)
self.assertEqual(b64decode(search_file[0].datas),
'import sftp')
self.assertEqual('open', FakeSFTP[-1]['method'])
self.assertEqual(hashlib.md5('import sftp').hexdigest(),
search_file.external_hash)
def test_03_sftp_import_md5_corrupt_file(self):
md5_file = ContextualStringIO()
md5_file.write(hashlib.md5('import test sftp corrupted').hexdigest())
md5_file.seek(0)
task = self.env.ref('external_file_location.sftp_import_task')
task.md5_check = True
with server_mock_sftp(
{'open': MultiResponse({
1: md5_file,
0: self.test_file_sftp}),
'listdir': [task.filename]}):
with self.assertRaises(UserError):
task.run_import()

12
attachment_synchronize/views/attachment_view.xml

@ -1,9 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<odoo>
<record id="view_attachment_improved_form" model="ir.ui.view">
<field name="model">ir.attachment.metadata</field>
<field name="inherit_id" ref="base_attachment_queue.view_attachment_improved_form" />
<record id="view_attachment_queue_form" model="ir.ui.view">
<field name="model">attachment.queue</field>
<field name="inherit_id" ref="attachment_queue.view_attachment_queue_form" />
<field name="arch" type="xml">
<field name="url" position="after">
<field name="task_id" attrs="{'required': [('file_type', '=', 'export')]}"/>
@ -12,9 +12,9 @@
</field>
</record>
<record id="view_external_attachment_tree" model="ir.ui.view">
<field name="model">ir.attachment.metadata</field>
<field name="inherit_id" ref="base_attachment_queue.view_external_attachment_tree" />
<record id="view_attachment_queue_tree" model="ir.ui.view">
<field name="model">attachment.queue</field>
<field name="inherit_id" ref="attachment_queue.view_attachment_queue_tree" />
<field name="arch" type="xml">
<field name="file_type" position="after">
<field name="task_id"/>

Loading…
Cancel
Save