Browse Source

[IMP] use new API

Conflicts:
	account_bank_statement_import/account_bank_statement_import.py
pull/19/head
Laurent Mignon (ACSONE) 10 years ago
parent
commit
ec26ad79b9
  1. 2
      account_bank_statement_import/__init__.py
  2. 2
      account_bank_statement_import/__openerp__.py
  3. 293
      account_bank_statement_import/account_bank_statement_import.py
  4. 3
      account_bank_statement_import/tests/test_import_bank_statement.py
  5. 2
      account_bank_statement_import_ofx/__init__.py
  6. 48
      account_bank_statement_import_ofx/account_bank_statement_import_ofx.py
  7. 27
      account_bank_statement_import_ofx/tests/test_import_bank_statement.py
  8. 1
      account_bank_statement_import_qif/__init__.py
  9. 75
      account_bank_statement_import_qif/account_bank_statement_import_qif.py
  10. 2
      account_bank_statement_import_qif/tests/__init__.py
  11. 35
      account_bank_statement_import_qif/tests/test_import_bank_statement.py

2
account_bank_statement_import/__init__.py

@ -1,3 +1,3 @@
# -*- encoding: utf-8 -*-
import account_bank_statement_import
from . import account_bank_statement_import

2
account_bank_statement_import/__openerp__.py

@ -1,6 +1,4 @@
# -*- encoding: utf-8 -*-
# noqa: This is a backport from Odoo. OCA has no control over style here.
# flake8: noqa
{
'name': 'Account Bank Statement Import',
'category': 'Accounting & Finance',

293
account_bank_statement_import/account_bank_statement_import.py

@ -1,11 +1,7 @@
# -*- coding: utf-8 -*-
# noqa: This is a backport from Odoo. OCA has no control over style here.
# flake8: noqa
import base64
from openerp import SUPERUSER_ID
from openerp.osv import fields, osv
from openerp import api, models, fields
from openerp.tools.translate import _
from openerp.exceptions import Warning
@ -13,54 +9,40 @@ import logging
_logger = logging.getLogger(__name__)
class account_bank_statement_line(osv.osv):
class account_bank_statement_line(models.Model):
_inherit = "account.bank.statement.line"
_columns = {
# Ensure transactions can be imported only once (if the import format provides unique transaction ids)
'unique_import_id': fields.char('Import ID', readonly=True, copy=False),
}
# Ensure transactions can be imported only once (if the import format
# provides unique transaction ids)
unique_import_id = fields.Char('Import ID', readonly=True, copy=False)
_sql_constraints = [
('unique_import_id', 'unique (unique_import_id)', 'A bank account transactions can be imported only once !')
('unique_import_id',
'unique (unique_import_id)',
'A bank account transactions can be imported only once !')
]
class account_bank_statement_import(osv.TransientModel):
class account_bank_statement_import(models.TransientModel):
_name = 'account.bank.statement.import'
_description = 'Import Bank Statement'
_columns = {
'data_file': fields.binary('Bank Statement File', required=True, help='Get you bank statements in electronic format from your bank and select them here.'),
}
def import_file(self, cr, uid, ids, context=None):
""" Process the file chosen in the wizard, create bank statement(s) and go to reconciliation. """
context = dict(context or {})
#set the active_id in the context, so that any extension module could
#reuse the fields chosen in the wizard if needed (see .QIF for example)
context.update({'active_id': ids[0]})
data_file = self.browse(cr, uid, ids[0], context=context).data_file
# The appropriate implementation module returns the required data
currency_code, account_number, stmts_vals = self._parse_file(cr, uid, base64.b64decode(data_file), context=context)
# Check raw data
self._check_parsed_data(cr, uid, stmts_vals, context=context)
# Try to find the bank account and currency in odoo
currency_id, bank_account_id = self._find_additional_data(cr, uid, currency_code, account_number, context=context)
# Find or create the bank journal
journal_id = self._get_journal(cr, uid, currency_id, bank_account_id, account_number, context=context)
# Create the bank account if not already existing
if not bank_account_id and account_number:
self._create_bank_account(cr, uid, account_number, journal_id=journal_id, partner_id=uid, context=context)
# Prepare statement data to be used for bank statements creation
stmts_vals = self._complete_stmts_vals(cr, uid, stmts_vals, journal_id, account_number, context=context)
# Create the bank statements
statement_ids, notifications = self._create_bank_statements(cr, uid, stmts_vals, context=context)
# Finally dispatch to reconciliation interface
model, action_id = self.pool.get('ir.model.data').get_object_reference(cr, uid, 'account', 'action_bank_reconcile_bank_statements')
action = self.pool[model].browse(cr, uid, action_id, context=context)
data_file = fields.Binary(
'Bank Statement File', required=True,
help='Get you bank statements in electronic format from your bank '
'and select them here.')
@api.multi
def import_file(self):
""" Process the file chosen in the wizard, create bank statement(s) and
go to reconciliation. """
self.ensure_one()
data_file = base64.b64decode(self.data_file)
statement_ids, notifications = self.with_context(
active_id=self.id)._import_file(data_file)
# dispatch to reconciliation interface
action = self.env.ref(
'account.action_bank_reconcile_bank_statements')
return {
'name': action.name,
'tag': action.tag,
@ -71,33 +53,67 @@ class account_bank_statement_import(osv.TransientModel):
'type': 'ir.actions.client',
}
def _parse_file(self, cr, uid, data_file, context=None):
""" Each module adding a file support must extends this method. It processes the file if it can, returns super otherwise, resulting in a chain of responsability.
This method parses the given file and returns the data required by the bank statement import process, as specified below.
@api.model
def _import_file(self, data_file):
""" Create bank statement(s) from file
"""
# The appropriate implementation module returns the required data
currency_code, account_number, stmts_vals = self._parse_file(data_file)
# Check raw data
self._check_parsed_data(stmts_vals)
# Try to find the bank account and currency in odoo
currency_id, bank_account_id = self._find_additional_data(
currency_code, account_number)
# Find or create the bank journal
journal_id = self._get_journal(
currency_id, bank_account_id, account_number)
# Create the bank account if not already existing
if not bank_account_id and account_number:
self._create_bank_account(
account_number, journal_id=journal_id)
# Prepare statement data to be used for bank statements creation
stmts_vals = self._complete_stmts_vals(
stmts_vals, journal_id, account_number)
# Create the bank statements
return self._create_bank_statements(stmts_vals)
@api.model
def _parse_file(self, data_file):
""" Each module adding a file support must extends this method. It
rocesses the file if it can, returns super otherwise, resulting in a
chain of responsability.
This method parses the given file and returns the data required by
the bank statement import process, as specified below.
rtype: triplet (if a value can't be retrieved, use None)
- currency code: string (e.g: 'EUR')
The ISO 4217 currency code, case insensitive
- account number: string (e.g: 'BE1234567890')
The number of the bank account which the statement belongs to
- bank statements data: list of dict containing (optional items marked by o) :
The number of the bank account which the statement belongs
to
- bank statements data: list of dict containing (optional
items marked by o) :
- 'name': string (e.g: '000000123')
- 'date': date (e.g: 2013-06-26)
-o 'balance_start': float (e.g: 8368.56)
-o 'balance_end_real': float (e.g: 8888.88)
- 'transactions': list of dict containing :
- 'name': string (e.g: 'KBC-INVESTERINGSKREDIET 787-5562831-01')
- 'name': string
(e.g: 'KBC-INVESTERINGSKREDIET 787-5562831-01')
- 'date': date
- 'amount': float
- 'unique_import_id': string
-o 'account_number': string
Will be used to find/create the res.partner.bank in odoo
Will be used to find/create the res.partner.bank
in odoo
-o 'note': string
-o 'partner_name': string
-o 'ref': string
"""
raise Warning(_('Could not make sense of the given file.\nDid you install the module to support this type of file ?'))
raise Warning(_('Could not make sense of the given file.\nDid you '
'install the module to support this type of file ?'))
def _check_parsed_data(self, cr, uid, stmts_vals, context=None):
@api.model
def _check_parsed_data(self, stmts_vals):
""" Basic and structural verifications """
if len(stmts_vals) == 0:
raise Warning(_('This file doesn\'t contain any statement.'))
@ -110,82 +126,78 @@ class account_bank_statement_import(osv.TransientModel):
if no_st_line:
raise Warning(_('This file doesn\'t contain any transaction.'))
def _find_additional_data(self, cr, uid, currency_code, account_number, context=None):
@api.model
def _find_additional_data(self, currency_code, account_number):
""" Get the res.currency ID and the res.partner.bank ID """
currency_id = False # So if no currency_code is provided, we'll use the company currency
# if no currency_code is provided, we'll use the company currency
currency_id = False
if currency_code:
currency_ids = self.pool.get('res.currency').search(cr, uid, [('name', '=ilike', currency_code)], context=context)
company_currency_id = self.pool.get('res.users').browse(cr, uid, uid, context=context).company_id.currency_id.id
currency_ids = self.env['res.currency'].search(
[('name', '=ilike', currency_code)])
company_currency_id = self.env.user.company_id.currency_id
if currency_ids:
if currency_ids[0] != company_currency_id:
currency_id = currency_ids[0]
currency_id = currency_ids[0].id
bank_account_id = None
if account_number and len(account_number) > 4:
account_number = account_number.replace(' ', '').replace('-', '')
cr.execute("select id from res_partner_bank where replace(replace(acc_number,' ',''),'-','') = %s", (account_number,))
bank_account_ids = [id[0] for id in cr.fetchall()]
bank_account_ids = self.pool.get('res.partner.bank').search(cr, uid, [('id', 'in', bank_account_ids)], context=context)
cr = self.env.cr
cr.execute(
"select id from res_partner_bank "
"where replace(replace(acc_number,' ',''),'-','') = %s",
(account_number,))
bank_account_ids = [val[0] for val in cr.fetchall()]
bank_account_ids = self.env['res.partner.bank'].search(
[('id', 'in', bank_account_ids)], limit=1)
if bank_account_ids:
bank_account_id = bank_account_ids[0]
bank_account_id = bank_account_ids[0].id
return currency_id, bank_account_id
def _get_journal(self, cr, uid, currency_id, bank_account_id, account_number, context=None):
@api.model
def _get_journal(self, currency_id, bank_account_id, account_number):
""" Find or create the journal """
bank_pool = self.pool.get('res.partner.bank')
bank_model = self.env['res.partner.bank']
# Find the journal from context or bank account
journal_id = context.get('journal_id')
journal_id = self.env.context.get('journal_id')
if bank_account_id:
bank_account = bank_pool.browse(cr, uid, bank_account_id, context=context)
bank_account = bank_model.browse(bank_account_id)
if journal_id:
if bank_account.journal_id.id and bank_account.journal_id.id != journal_id:
raise Warning(_('The account of this statement is linked to another journal.'))
if (bank_account.journal_id.id and
bank_account.journal_id.id != journal_id):
raise Warning(
_('The account of this statement is linked to '
'another journal.'))
if not bank_account.journal_id.id:
bank_pool.write(cr, uid, [bank_account_id], {'journal_id': journal_id}, context=context)
bank_model.write({'journal_id': journal_id})
else:
if bank_account.journal_id.id:
journal_id = bank_account.journal_id.id
# If importing into an existing journal, its currency must be the same as the bank statement
# If importing into an existing journal, its currency must be the same
# as the bank statement
if journal_id:
journal_currency_id = self.pool.get('account.journal').browse(cr, uid, journal_id, context=context).currency.id
journal_currency_id = self.env['account.journal'].browse(
journal_id).currency.id
if currency_id and currency_id != journal_currency_id:
raise Warning(_('The currency of the bank statement is not the same as the currency of the journal !'))
# If there is no journal, create one (and its account)
# I think it's too dangerous, so I disable that code by default -- Alexis de Lattre
# -- Totally disabled, Ronald Portier
# if context.get('allow_auto_create_journal') and not journal_id and account_number:
# journal_id = self._create_journal(cr, uid, currency_id, account_number, context=context)
# if bank_account_id:
# bank_pool.write(cr, uid, [bank_account_id], {'journal_id': journal_id}, context=context)
raise Warning(_('The currency of the bank statement is not '
'the same as the currency of the journal !'))
# If we couldn't find/create a journal, everything is lost
if not journal_id:
raise Warning(_('Cannot find in which journal import this statement. Please manually select a journal.'))
raise Warning(_('Cannot find in which journal import this '
'statement. Please manually select a journal.'))
return journal_id
def _create_journal(self, cr, uid, currency_id, account_number, context=None):
""" Create a journal and its account """
wmca_pool = self.pool.get('wizard.multi.charts.accounts')
company = self.pool.get('res.users').browse(cr, uid, uid, context=context).company_id
vals_account = {'currency_id': currency_id, 'acc_name': account_number, 'account_type': 'bank', 'currency_id': currency_id}
vals_account = wmca_pool._prepare_bank_account(cr, uid, company, vals_account, context=context)
account_id = self.pool.get('account.account').create(cr, uid, vals_account, context=context)
vals_journal = {'currency_id': currency_id, 'acc_name': _('Bank') + ' ' + account_number, 'account_type': 'bank'}
vals_journal = wmca_pool._prepare_bank_journal(cr, uid, company, vals_journal, account_id, context=context)
return self.pool.get('account.journal').create(cr, uid, vals_journal, context=context)
def _create_bank_account(self, cr, uid, account_number, journal_id=False, partner_id=False, context=None):
@api.model
@api.returns('res.partner.bank')
def _create_bank_account(self, account_number, journal_id=False):
try:
type_model, type_id = self.pool.get('ir.model.data').get_object_reference(cr, uid, 'base', 'bank_normal')
type_id = self.pool.get('res.partner.bank.type').browse(cr, uid, type_id, context=context)
bank_code = type_id.code
bank_type = self.env.ref('base.bank_normal')
bank_code = bank_type.code
except ValueError:
bank_code = 'bank'
account_number = account_number.replace(' ', '').replace('-', '')
@ -193,52 +205,63 @@ class account_bank_statement_import(osv.TransientModel):
'acc_number': account_number,
'state': bank_code,
}
# Odoo users bank accounts (which we import statement from) have company_id and journal_id set
# while 'counterpart' bank accounts (from which statement transactions originate) don't.
# Warning : if company_id is set, the method post_write of class bank will create a journal
# Odoo users bank accounts (which we import statement from) have
# company_id and journal_id set while 'counterpart' bank accounts
# (from which statement transactions originate) don't.
# Warning : if company_id is set, the method post_write of class
# bank will create a journal
if journal_id:
company_id = self.pool['account.journal'].browse(
cr, uid, journal_id, context=context).company_id.id
vals = self.pool['res.partner.bank'].onchange_company_id(
cr, uid, None, company_id, context=None)
company_id = self.env['account.journal'].browse(
journal_id).company_id.id
vals = self.env['res.partner.bank'].onchange_company_id(company_id)
vals_acc.update(vals.get('value', {}))
vals_acc['journal_id'] = journal_id
vals_acc['company_id'] = company_id
return self.pool.get('res.partner.bank').create(cr, uid, vals_acc, context=context)
return self.env['res.partner.bank'].create(vals_acc)
def _complete_stmts_vals(self, cr, uid, stmts_vals, journal_id, account_number, context=None):
@api.model
def _complete_stmts_vals(self, stmts_vals, journal_id, account_number):
for st_vals in stmts_vals:
st_vals['journal_id'] = journal_id
for line_vals in st_vals['transactions']:
unique_import_id = line_vals.get('unique_import_id', False)
if unique_import_id:
line_vals['unique_import_id'] = (account_number and account_number + '-' or '') + unique_import_id
if not 'bank_account_id' in line_vals or not line_vals['bank_account_id']:
# Find the partner and his bank account or create the bank account. The partner selected during the
# reconciliation process will be linked to the bank when the statement is closed.
line_vals['unique_import_id'] = (
account_number and account_number + '-' or '') + \
unique_import_id
if not line_vals.get('bank_account_id'):
# Find the partner and his bank account or create the bank
# account. The partner selected during the reconciliation
# process will be linked to the bank when the statement is
# closed.
partner_id = False
bank_account_id = False
identifying_string = line_vals.get('account_number', False)
identifying_string = line_vals.get('account_number')
if identifying_string:
identifying_string = identifying_string.replace(' ', '').replace('-', '')
ids = self.pool.get('res.partner.bank').search(cr, uid, [('acc_number', '=', identifying_string)], context=context)
if ids:
bank_account_id = ids[0]
partner_id = self.pool.get('res.partner.bank').browse(cr, uid, bank_account_id, context=context).partner_id.id
bank_model = self.env['res.partner.bank']
banks = bank_model.search(
[('acc_number', '=', identifying_string)], limit=1)
if banks:
partner_id = banks[0].partner_id.id
else:
bank_account_id = self._create_bank_account(cr, uid, identifying_string, context=context)
bank_account_id = self._create_bank_account(
identifying_string).id
line_vals['partner_id'] = partner_id
line_vals['bank_account_id'] = bank_account_id
return stmts_vals
def _create_bank_statements(self, cr, uid, stmts_vals, context=None):
""" Create new bank statements from imported values, filtering out already imported transactions, and returns data used by the reconciliation widget """
bs_obj = self.pool.get('account.bank.statement')
bsl_obj = self.pool.get('account.bank.statement.line')
@api.model
def _create_bank_statements(self, stmts_vals):
""" Create new bank statements from imported values, filtering out
already imported transactions, and returns data used by the
reconciliation widget
"""
bs_model = self.env['account.bank.statement']
bsl_model = self.env['account.bank.statement.line']
# Filter out already imported transactions and create statements
statement_ids = []
@ -246,20 +269,25 @@ class account_bank_statement_import(osv.TransientModel):
for st_vals in stmts_vals:
filtered_st_lines = []
for line_vals in st_vals['transactions']:
if not 'unique_import_id' in line_vals \
if 'unique_import_id' not in line_vals \
or not line_vals['unique_import_id'] \
or not bool(bsl_obj.search(cr, SUPERUSER_ID, [('unique_import_id', '=', line_vals['unique_import_id'])], limit=1, context=context)):
or not bool(bsl_model.sudo().search(
[('unique_import_id', '=',
line_vals['unique_import_id'])],
limit=1)):
filtered_st_lines.append(line_vals)
else:
ignored_statement_lines_import_ids.append(line_vals['unique_import_id'])
ignored_statement_lines_import_ids.append(
line_vals['unique_import_id'])
if len(filtered_st_lines) > 0:
# Remove values that won't be used to create records
st_vals.pop('transactions', None)
for line_vals in filtered_st_lines:
line_vals.pop('account_number', None)
# Create the satement
st_vals['line_ids'] = [[0, False, line] for line in filtered_st_lines]
statement_ids.append(bs_obj.create(cr, uid, st_vals, context=context))
st_vals['line_ids'] = [[0, False, line] for line in
filtered_st_lines]
statement_ids.append(bs_model.create(st_vals).id)
if len(statement_ids) == 0:
raise Warning(_('You have already imported that file.'))
@ -269,13 +297,18 @@ class account_bank_statement_import(osv.TransientModel):
if num_ignored > 0:
notifications += [{
'type': 'warning',
'message': _("%d transactions had already been imported and were ignored.") % num_ignored if num_ignored > 1 else _("1 transaction had already been imported and was ignored."),
'message': _("%d transactions had already been imported and "
"were ignored.") % num_ignored
if num_ignored > 1
else _("1 transaction had already been imported and "
"was ignored."),
'details': {
'name': _('Already imported items'),
'model': 'account.bank.statement.line',
'ids': bsl_obj.search(cr, uid, [('unique_import_id', 'in', ignored_statement_lines_import_ids)], context=context)
'ids': bsl_model.search(
[('unique_import_id', 'in',
ignored_statement_lines_import_ids)]).ids
}
}]
return statement_ids, notifications

3
account_bank_statement_import/tests/test_import_bank_statement.py

@ -67,9 +67,8 @@ class TestAccountBankStatemetImport(TransactionCase):
expected_id = journal.company_id.partner_id.id
st_import = self.statement_import_model.sudo(self.other_user_id_a.id)
bank_id = st_import._create_bank_account(
bank = st_import._create_bank_account(
'001251882303', journal_id=self.journal_id)
bank = self.env['res.partner.bank'].browse(bank_id)
self.assertEqual(bank.partner_id.id,
expected_id)

2
account_bank_statement_import_ofx/__init__.py

@ -1,3 +1,3 @@
# -*- encoding: utf-8 -*-
import account_bank_statement_import_ofx
from . import account_bank_statement_import_ofx

48
account_bank_statement_import_ofx/account_bank_statement_import_ofx.py

@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
# noqa: This is a backport from Odoo. OCA has no control over style here.
# flake8: noqa
import logging
import StringIO
from openerp.osv import osv
from openerp import api, models
from openerp.tools.translate import _
from openerp.exceptions import Warning
@ -17,37 +15,46 @@ except ImportError:
_logger.warn("ofxparse not found, OFX parsing disabled.")
ofxparser = None
class account_bank_statement_import(osv.TransientModel):
class account_bank_statement_import(models.TransientModel):
_inherit = 'account.bank.statement.import'
def _check_ofx(self, cr, uid, file, context=None):
@api.model
def _check_ofx(self, data_file):
if ofxparser is None:
return False
try:
ofx = ofxparser.parse(file)
ofx = ofxparser.parse(StringIO.StringIO(data_file))
except:
return False
return ofx
def _parse_file(self, cr, uid, data_file, context=None):
ofx = self._check_ofx(cr, uid, StringIO.StringIO(data_file), context=context)
@api.model
def _parse_file(self, data_file):
ofx = self._check_ofx(data_file)
if not ofx:
return super(account_bank_statement_import, self)._parse_file(cr, uid, data_file, context=context)
return super(account_bank_statement_import, self)._parse_file(
data_file)
transactions = []
total_amt = 0.00
try:
for transaction in ofx.account.statement.transactions:
# Since ofxparse doesn't provide account numbers, we'll have to find res.partner and res.partner.bank here
# (normal behavious is to provide 'account_number', which the generic module uses to find partner/bank)
# Since ofxparse doesn't provide account numbers, we'll have
# to find res.partner and res.partner.bank here
# (normal behavious is to provide 'account_number', which the
# generic module uses to find partner/bank)
bank_account_id = partner_id = False
ids = self.pool.get('res.partner.bank').search(cr, uid, [('owner_name', '=', transaction.payee)], context=context)
if ids:
bank_account_id = bank_account_id = ids[0]
partner_id = self.pool.get('res.partner.bank').browse(cr, uid, bank_account_id, context=context).partner_id.id
banks = self.env['res.partner.bank'].search(
[('owner_name', '=', transaction.payee)], limit=1)
if banks:
bank_account = banks[0]
bank_account_id = bank_account.id
partner_id = bank_account.partner_id.id
vals_line = {
'date': transaction.date,
'name': transaction.payee + (transaction.memo and ': ' + transaction.memo or ''),
'name': transaction.payee + (
transaction.memo and ': ' + transaction.memo or ''),
'ref': transaction.id,
'amount': transaction.amount,
'unique_import_id': transaction.id,
@ -57,12 +64,15 @@ class account_bank_statement_import(osv.TransientModel):
total_amt += float(transaction.amount)
transactions.append(vals_line)
except Exception, e:
raise Warning(_("The following problem occurred during import. The file might not be valid.\n\n %s" % e.message))
raise Warning(_("The following problem occurred during import. "
"The file might not be valid.\n\n %s" % e.message))
vals_bank_statement = {
'name': ofx.account.routing_number,
'transactions': transactions,
'balance_start': ofx.account.statement.balance,
'balance_end_real': float(ofx.account.statement.balance) + total_amt,
'balance_end_real':
float(ofx.account.statement.balance) + total_amt,
}
return ofx.account.statement.currency, ofx.account.number, [vals_bank_statement]
return ofx.account.statement.currency, ofx.account.number, [
vals_bank_statement]

27
account_bank_statement_import_ofx/tests/test_import_bank_statement.py

@ -4,29 +4,32 @@
from openerp.tests.common import TransactionCase
from openerp.modules.module import get_module_resource
class TestOfxFile(TransactionCase):
"""Tests for import bank statement ofx file format (account.bank.statement.import)
"""Tests for import bank statement ofx file format
(account.bank.statement.import)
"""
def setUp(self):
super(TestOfxFile, self).setUp()
self.statement_import_model = self.registry('account.bank.statement.import')
self.bank_statement_model = self.registry('account.bank.statement')
self.statement_import_model = self.env['account.bank.statement.import']
self.bank_statement_model = self.env['account.bank.statement']
def test_ofx_file_import(self):
try:
from ofxparse import OfxParser as ofxparser
except ImportError:
#the Python library isn't installed on the server, the OFX import is unavailable and the test cannot be run
# the Python library isn't installed on the server, the OFX import
# is unavailable and the test cannot be run
return True
cr, uid = self.cr, self.uid
ofx_file_path = get_module_resource('account_bank_statement_import_ofx', 'test_ofx_file', 'test_ofx.ofx')
ofx_file_path = get_module_resource(
'account_bank_statement_import_ofx',
'test_ofx_file', 'test_ofx.ofx')
ofx_file = open(ofx_file_path, 'rb').read().encode('base64')
bank_statement_id = self.statement_import_model.create(cr, uid, dict(
data_file=ofx_file,
))
self.statement_import_model.import_file(cr, uid, [bank_statement_id])
statement_id = self.bank_statement_model.search(cr, uid, [('name', '=', '000000123')])[0]
bank_st_record = self.bank_statement_model.browse(cr, uid, statement_id)
bank_statement = self.statement_import_model.create(
dict(data_file=ofx_file))
bank_statement.import_file()
bank_st_record = self.bank_statement_model.search(
[('name', '=', '000000123')])[0]
self.assertEquals(bank_st_record.balance_start, 2156.56)
self.assertEquals(bank_st_record.balance_end_real, 1796.56)

1
account_bank_statement_import_qif/__init__.py

@ -1,3 +1,2 @@
# -*- coding: utf-8 -*-
from . import account_bank_statement_import_qif

75
account_bank_statement_import_qif/account_bank_statement_import_qif.py

@ -6,41 +6,49 @@ import dateutil.parser
import StringIO
from openerp.tools.translate import _
from openerp.osv import osv, fields
from openerp import api, models, fields
from openerp.exceptions import Warning
class account_bank_statement_import(osv.TransientModel):
_inherit = "account.bank.statement.import"
_columns = {
'journal_id': fields.many2one('account.journal', string='Journal', help='Accounting journal related to the bank statement you\'re importing. It has be be manually chosen for statement formats which doesn\'t allow automatic journal detection (QIF for example).'),
'hide_journal_field': fields.boolean('Hide the journal field in the view'),
}
class account_bank_statement_import(models.TransientModel):
_inherit = "account.bank.statement.import"
def _get_hide_journal_field(self, cr, uid, context=None):
return context and 'journal_id' in context or False
@api.model
def _get_hide_journal_field(self):
return self.env.context.get('journal_id') and True
_defaults = {
'hide_journal_field': _get_hide_journal_field,
}
journal_id = fields.Many2one(
'account.journal', string='Journal',
help='Accounting journal related to the bank statement you\'re '
'importing. It has be be manually chosen for statement formats which '
'doesn\'t allow automatic journal detection (QIF for example).')
hide_journal_field = fields.Boolean(
'Hide the journal field in the view', default=_get_hide_journal_field)
def _get_journal(self, cr, uid, currency_id, bank_account_id, account_number, context=None):
""" As .QIF format does not allow us to detect the journal, we need to let the user choose it.
We set it in context before to call super so it's the same as calling the widget from a journal """
if context is None:
context = {}
if context.get('active_id'):
record = self.browse(cr, uid, context.get('active_id'), context=context)
@api.model
def _get_journal(self, currency_id, bank_account_id, account_number):
""" As .QIF format does not allow us to detect the journal, we need to
let the user choose it.
We set it in context before to call super so it's the same as
calling the widget from a journal """
record = self
active_id = self.env.context.get('active_id')
if active_id:
record = self.browse(active_id)
if record.journal_id:
context['journal_id'] = record.journal_id.id
return super(account_bank_statement_import, self)._get_journal(cr, uid, currency_id, bank_account_id, account_number, context=context)
record = record.with_context(journal_id=record.journal_id.id)
return super(account_bank_statement_import, record)._get_journal(
currency_id, bank_account_id, account_number)
def _check_qif(self, cr, uid, data_file, context=None):
@api.model
def _check_qif(self, data_file):
return data_file.strip().startswith('!Type:')
def _parse_file(self, cr, uid, data_file, context=None):
if not self._check_qif(cr, uid, data_file, context=context):
return super(account_bank_statement_import, self)._parse_file(cr, uid, data_file, context=context)
@api.model
def _parse_file(self, data_file):
if not self._check_qif(data_file):
return super(account_bank_statement_import, self)._parse_file(
data_file)
try:
file_data = ""
@ -64,7 +72,8 @@ class account_bank_statement_import(osv.TransientModel):
if not line:
continue
if line[0] == 'D': # date of transaction
vals_line['date'] = dateutil.parser.parse(line[1:], fuzzy=True).date()
vals_line['date'] = dateutil.parser.parse(
line[1:], fuzzy=True).date()
elif line[0] == 'T': # Total amount
total += float(line[1:].replace(',', ''))
vals_line['amount'] = float(line[1:].replace(',', ''))
@ -74,10 +83,12 @@ class account_bank_statement_import(osv.TransientModel):
vals_line['name'] = 'name' in vals_line and line[1:] + ': ' + vals_line['name'] or line[1:]
# Since QIF doesn't provide account numbers, we'll have to find res.partner and res.partner.bank here
# (normal behavious is to provide 'account_number', which the generic module uses to find partner/bank)
ids = self.pool.get('res.partner.bank').search(cr, uid, [('owner_name', '=', line[1:])], context=context)
if ids:
vals_line['bank_account_id'] = bank_account_id = ids[0]
vals_line['partner_id'] = self.pool.get('res.partner.bank').browse(cr, uid, bank_account_id, context=context).partner_id.id
banks = self.env['res.partner.bank'].search(
[('owner_name', '=', line[1:])], limit=1)
if banks:
bank_account = banks[0]
vals_line['bank_account_id'] = bank_account.id
vals_line['partner_id'] = bank_account.partner_id.id
elif line[0] == 'M': # Memo
vals_line['name'] = 'name' in vals_line and vals_line['name'] + ': ' + line[1:] or line[1:]
elif line[0] == '^': # end of item
@ -88,11 +99,11 @@ class account_bank_statement_import(osv.TransientModel):
else:
pass
else:
raise Warning(_('This file is either not a bank statement or is not correctly formed.'))
raise Warning(_('This file is either not a bank statement or is '
'not correctly formed.'))
vals_bank_statement.update({
'balance_end_real': total,
'transactions': transactions
})
return None, None, [vals_bank_statement]

2
account_bank_statement_import_qif/tests/__init__.py

@ -1,4 +1,2 @@
# -*- coding: utf-8 -*-
# noqa: This is a backport from Odoo. OCA has no control over style here.
# flake8: noqa
from . import test_import_bank_statement

35
account_bank_statement_import_qif/tests/test_import_bank_statement.py

@ -1,32 +1,29 @@
# -*- coding: utf-8 -*-
# noqa: This is a backport from Odoo. OCA has no control over style here.
# flake8: noqa
from openerp.tests.common import TransactionCase
from openerp.modules.module import get_module_resource
class TestQifFile(TransactionCase):
"""Tests for import bank statement qif file format (account.bank.statement.import)
"""Tests for import bank statement qif file format
(account.bank.statement.import)
"""
def setUp(self):
super(TestQifFile, self).setUp()
self.statement_import_model = self.registry('account.bank.statement.import')
self.bank_statement_model = self.registry('account.bank.statement')
self.bank_statement_line_model = self.registry('account.bank.statement.line')
self.statement_import_model = self.env['account.bank.statement.import']
self.statement_line_model = self.env['account.bank.statement.line']
def test_qif_file_import(self):
from openerp.tools import float_compare
cr, uid = self.cr, self.uid
qif_file_path = get_module_resource('account_bank_statement_import_qif', 'test_qif_file', 'test_qif.qif')
qif_file_path = get_module_resource(
'account_bank_statement_import_qif',
'test_qif_file', 'test_qif.qif')
qif_file = open(qif_file_path, 'rb').read().encode('base64')
bank_statement_id = self.statement_import_model.create(cr, uid, dict(
data_file=qif_file,
))
context = {
'journal_id': self.registry('ir.model.data').get_object_reference(cr, uid, 'account', 'bank_journal')[1]
}
self.statement_import_model.import_file(cr, uid, [bank_statement_id], context=context)
line_id = self.bank_statement_line_model.search(cr, uid, [('name', '=', 'YOUR LOCAL SUPERMARKET')])[0]
statement_id = self.bank_statement_line_model.browse(cr, uid, line_id).statement_id.id
bank_st_record = self.bank_statement_model.browse(cr, uid, statement_id)
assert float_compare(bank_st_record.balance_end_real, -1896.09, 2) == 0
bank_statement_improt = self.statement_import_model.with_context(
journal_id=self.ref('account.bank_journal')).create(
dict(data_file=qif_file))
bank_statement_improt.import_file()
bank_statement = self.statement_line_model.search(
[('name', '=', 'YOUR LOCAL SUPERMARKET')], limit=1)[0].statement_id
assert float_compare(bank_statement.balance_end_real, -1896.09, 2) == 0
Loading…
Cancel
Save