You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
288 lines
11 KiB
288 lines
11 KiB
# Copyright 2014-2017 Akretion (http://www.akretion.com).
|
|
# @author Alexis de Lattre <alexis.delattre@akretion.com>
|
|
# @author Sébastien BEAU <sebastien.beau@akretion.com>
|
|
# Copyright 2019 Eficent Business and IT Consulting Services, S.L.
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
import xlrd
|
|
import logging
|
|
import datetime as dtm
|
|
from datetime import datetime
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import UserError
|
|
import re
|
|
from io import StringIO
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import csv
|
|
except (ImportError, IOError) as err:
|
|
_logger.debug(err)
|
|
|
|
|
|
class AccountBankStatementImport(models.TransientModel):
|
|
_inherit = 'account.bank.statement.import'
|
|
|
|
txt_map_id = fields.Many2one(
|
|
comodel_name='account.bank.statement.import.map',
|
|
string='Txt map',
|
|
readonly=True,
|
|
)
|
|
|
|
@api.model
|
|
def _get_txt_encoding(self):
|
|
if self.txt_map_id.file_encoding:
|
|
return self.txt_map_id.file_encoding
|
|
return 'utf-8-sig'
|
|
|
|
@api.model
|
|
def _get_txt_str_data(self, data_file):
|
|
if not isinstance(data_file, str):
|
|
data_file = data_file.decode(self._get_txt_encoding())
|
|
return data_file.strip()
|
|
|
|
@api.model
|
|
def _txt_convert_amount(self, amount_str):
|
|
if not amount_str:
|
|
return 0.0
|
|
if self.txt_map_id:
|
|
thousands, decimal = self.txt_map_id._get_separators()
|
|
else:
|
|
thousands, decimal = ',', '.'
|
|
valstr = re.sub(r'[^\d%s%s.-]' % (thousands, decimal), '', amount_str)
|
|
valstrdot = valstr.replace(thousands, '')
|
|
valstrdot = valstrdot.replace(decimal, '.')
|
|
return float(valstrdot)
|
|
|
|
@api.model
|
|
def _check_xls(self, data_file):
|
|
# Try if it is an Excel file
|
|
headers = self.mapped('txt_map_id.map_line_ids.name')
|
|
try:
|
|
file_headers = []
|
|
book = xlrd.open_workbook(file_contents=data_file)
|
|
xl_sheet = book.sheet_by_index(0)
|
|
row = xl_sheet.row(0) # 1st row
|
|
for idx, cell_obj in enumerate(row):
|
|
cell_type_str = xlrd.sheet.ctype_text.get(cell_obj.ctype, False)
|
|
if cell_type_str:
|
|
file_headers.append(cell_obj.value)
|
|
else:
|
|
return False
|
|
if any(item not in file_headers for item in headers):
|
|
raise UserError(
|
|
_("Headers of file to import and Txt map lines does not "
|
|
"match."))
|
|
except xlrd.XLRDError as e:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
return True
|
|
|
|
@api.model
|
|
def _check_txt(self, data_file):
|
|
data_file = self._get_txt_str_data(data_file)
|
|
if not self.txt_map_id:
|
|
return False
|
|
headers = self.mapped('txt_map_id.map_line_ids.name')
|
|
file_headers = data_file.split('\n', 1)[0]
|
|
if any(item not in file_headers for item in headers):
|
|
raise UserError(
|
|
_("Headers of file to import and Txt map lines does not "
|
|
"match."))
|
|
return True
|
|
|
|
def _get_currency_fields(self):
|
|
return ['amount', 'amount_currency']
|
|
|
|
def _convert_txt_line_to_dict(self, idx, line):
|
|
rline = dict()
|
|
for item in range(len(line)):
|
|
txt_map = self.mapped('txt_map_id.map_line_ids')[item]
|
|
value = line[item]
|
|
if not txt_map.field_to_assign:
|
|
continue
|
|
if txt_map.date_format:
|
|
try:
|
|
value = fields.Date.to_string(
|
|
datetime.strptime(value, txt_map.date_format))
|
|
except Exception:
|
|
raise UserError(
|
|
_("Date format of map file and Txt date does "
|
|
"not match."))
|
|
rline[txt_map.field_to_assign] = value
|
|
for field in self._get_currency_fields():
|
|
_logger.debug('Trying to convert %s to float' % rline[field])
|
|
try:
|
|
rline[field] = self._txt_convert_amount(rline[field])
|
|
except Exception:
|
|
raise UserError(
|
|
_("Value '%s' for the field '%s' on line %d, "
|
|
"cannot be converted to float")
|
|
% (rline[field], field, idx))
|
|
return rline
|
|
|
|
def _parse_txt_file(self, data_file):
|
|
data_file = self._get_txt_str_data(data_file)
|
|
f = StringIO(data_file)
|
|
f.seek(0)
|
|
raw_lines = []
|
|
if not self.txt_map_id.quotechar:
|
|
reader = csv.reader(f,
|
|
delimiter=self.txt_map_id.delimiter or False)
|
|
else:
|
|
reader = csv.reader(f,
|
|
quotechar=self.txt_map_id.quotechar,
|
|
delimiter=self.txt_map_id.delimiter or False)
|
|
next(reader) # Drop header
|
|
for idx, line in enumerate(reader):
|
|
_logger.debug("Line %d: %s" % (idx, line))
|
|
raw_lines.append(self._convert_txt_line_to_dict(idx, line))
|
|
return raw_lines
|
|
|
|
def _convert_xls_line_to_dict(self, row_idx, xl_sheet):
|
|
rline = dict()
|
|
for col_idx in range(0, xl_sheet.ncols): # Iterate through columns
|
|
txt_map = self.mapped('txt_map_id.map_line_ids')[col_idx]
|
|
cell_obj = xl_sheet.cell(row_idx, col_idx) # Get cell
|
|
ctype = xl_sheet.cell(row_idx, col_idx).ctype
|
|
value = cell_obj.value
|
|
if not txt_map.field_to_assign:
|
|
continue
|
|
if ctype == xlrd.XL_CELL_DATE:
|
|
ms_date_number = xl_sheet.cell(row_idx, col_idx).value
|
|
try:
|
|
year, month, day, hour, minute, \
|
|
second = xlrd.xldate_as_tuple(
|
|
ms_date_number, 0)
|
|
except xlrd.XLDateError as e:
|
|
raise UserError(
|
|
_('An error was found translating a date '
|
|
'field from the file: %s') % e)
|
|
value = dtm.date(year, month, day)
|
|
value = value.strftime('%Y-%m-%d')
|
|
rline[txt_map.field_to_assign] = value
|
|
|
|
return rline
|
|
|
|
def _parse_xls_file(self, data_file):
|
|
try:
|
|
raw_lines = []
|
|
book = xlrd.open_workbook(file_contents=data_file)
|
|
xl_sheet = book.sheet_by_index(0)
|
|
for row_idx in range(1, xl_sheet.nrows):
|
|
_logger.debug("Line %d" % row_idx)
|
|
raw_lines.append(self._convert_xls_line_to_dict(
|
|
row_idx, xl_sheet))
|
|
except xlrd.XLRDError:
|
|
return False
|
|
except Exception as e:
|
|
return False
|
|
return raw_lines
|
|
|
|
def _post_process_statement_line(self, raw_lines):
|
|
""" Enter your additional logic here. """
|
|
return raw_lines
|
|
|
|
def _get_journal(self):
|
|
journal_id = self.env.context.get('journal_id')
|
|
if not journal_id:
|
|
raise UserError(_('You must run this wizard from the journal'))
|
|
return self.env['account.journal'].browse(journal_id)
|
|
|
|
def _get_currency_id(self, fline):
|
|
journal = self._get_journal()
|
|
line_currency_name = fline.get('currency', False)
|
|
currency = journal.currency_id or journal.company_id.currency_id
|
|
if line_currency_name and line_currency_name != currency.name:
|
|
currency = self.env['res.currency'].search(
|
|
[('name', '=', fline['currency'])], limit=1)
|
|
return currency.id
|
|
return False
|
|
|
|
@api.model
|
|
def _get_partner_id(self, fline):
|
|
partner_name = fline.get('partner_name', False)
|
|
if partner_name:
|
|
partner = self.env['res.partner'].search([
|
|
('name', '=ilike', partner_name)])
|
|
if partner and len(partner) == 1:
|
|
return partner.commercial_partner_id.id
|
|
return None
|
|
|
|
def _prepare_txt_statement_line(self, fline):
|
|
currency_id = self._get_currency_id(fline)
|
|
return {
|
|
'date': fline.get('date', False),
|
|
'name': fline.get('name', ''),
|
|
'ref': fline.get('ref', False),
|
|
'note': fline.get('Notes', False),
|
|
'amount': fline.get('amount', 0.0),
|
|
'currency_id': self._get_currency_id(fline),
|
|
'amount_currency': currency_id and fline.get(
|
|
'amount_currency', 0.0) or 0.0,
|
|
'partner_id': self._get_partner_id(fline),
|
|
'account_number': fline.get('account_number', False),
|
|
}
|
|
|
|
def _prepare_txt_statement(self, lines):
|
|
balance_end_real = 0.0
|
|
for line in lines:
|
|
if 'amount' in line and line['amount']:
|
|
balance_end_real += line['amount']
|
|
|
|
return {
|
|
'name':
|
|
_('%s Import %s > %s')
|
|
% (self.txt_map_id.name,
|
|
lines[0]['date'], lines[-1]['date']),
|
|
'date': lines[-1]['date'],
|
|
'balance_start': 0.0,
|
|
'balance_end_real': balance_end_real,
|
|
}
|
|
|
|
@api.model
|
|
def _parse_file(self, data_file):
|
|
""" Import a file in Txt CSV format """
|
|
is_txt = False
|
|
is_xls = self._check_xls(data_file)
|
|
if not is_xls:
|
|
is_txt = self._check_txt(data_file)
|
|
if not is_txt and not is_xls:
|
|
return super(AccountBankStatementImport, self)._parse_file(
|
|
data_file)
|
|
if is_txt:
|
|
raw_lines = self._parse_txt_file(data_file)
|
|
else:
|
|
raw_lines = self._parse_xls_file(data_file)
|
|
final_lines = self._post_process_statement_line(raw_lines)
|
|
vals_bank_statement = self._prepare_txt_statement(final_lines)
|
|
transactions = []
|
|
for fline in final_lines:
|
|
vals_line = self._prepare_txt_statement_line(fline)
|
|
_logger.debug("vals_line = %s" % vals_line)
|
|
transactions.append(vals_line)
|
|
vals_bank_statement['transactions'] = transactions
|
|
return None, None, [vals_bank_statement]
|
|
|
|
@api.model
|
|
def _complete_txt_statement_line(self, line):
|
|
""" Enter additional logic here. """
|
|
return None
|
|
|
|
@api.model
|
|
def _complete_stmts_vals(self, stmts_vals, journal_id, account_number):
|
|
stmts_vals = super(AccountBankStatementImport, self). \
|
|
_complete_stmts_vals(stmts_vals, journal_id, account_number)
|
|
for line in stmts_vals[0]['transactions']:
|
|
vals = self._complete_txt_statement_line(line)
|
|
if vals:
|
|
line.update(vals)
|
|
return stmts_vals
|
|
|
|
@api.model
|
|
def default_get(self, fields):
|
|
res = super(AccountBankStatementImport, self).default_get(fields)
|
|
journal = self._get_journal()
|
|
res['txt_map_id'] = journal.statement_import_txt_map_id.id
|
|
return res
|