Browse Source

[ENH] Streamline node parsing.

pull/153/head
Ronald Portier 8 years ago
parent
commit
49564f8323
  1. 110
      account_bank_statement_import_camt/models/parser.py

110
account_bank_statement_import_camt/models/parser.py

@ -15,12 +15,27 @@ from openerp.addons.account_bank_statement_import.parserlib import (
from openerp import models from openerp import models
_logger = logging.getLogger(__name__)
class CamtParser(models.AbstractModel): class CamtParser(models.AbstractModel):
_name = 'account.bank.statement.import.camt.parser'
"""Parser for camt bank statement import files.""" """Parser for camt bank statement import files."""
_name = 'account.bank.statement.import.camt.parser'
def parse_amount(self, ns, node):
def __init__(self):
"""Define and initialize attributes."""
super(CamtParser, self).__init__()
self.namespace = ''
def xpath(self, node, expr):
"""
Wrap namespaces argument into call to Element.xpath():
self.xpath(node, './ns:Acct/ns:Id')
"""
return node.xpath(expr, namespaces={'ns': self.namespace})
def parse_amount(self, node):
"""Parse element that contains Amount and CreditDebitIndicator.""" """Parse element that contains Amount and CreditDebitIndicator."""
if node is None: if node is None:
return 0.0 return 0.0
@ -41,7 +56,7 @@ class CamtParser(models.AbstractModel):
return amount return amount
def add_value_from_node( def add_value_from_node(
self, ns, node, xpath_str, obj, attr_name, join_str=None,
self, node, xpath_str, obj, attr_name, join_str=None,
default=None): default=None):
"""Add value to object from first or all nodes found with xpath. """Add value to object from first or all nodes found with xpath.
@ -51,7 +66,7 @@ class CamtParser(models.AbstractModel):
if not isinstance(xpath_str, (list, tuple)): if not isinstance(xpath_str, (list, tuple)):
xpath_str = [xpath_str] xpath_str = [xpath_str]
for search_str in xpath_str: for search_str in xpath_str:
found_node = node.xpath(search_str, namespaces={'ns': ns})
found_node = self.xpath(node, search_str)
if found_node: if found_node:
if join_str is None: if join_str is None:
attr_value = found_node[0].text attr_value = found_node[0].text
@ -67,7 +82,6 @@ class CamtParser(models.AbstractModel):
"""Parse TxDtls node.""" """Parse TxDtls node."""
# message # message
self.add_value_from_node( self.add_value_from_node(
ns,
node, node,
[ [
'./ns:RmtInf/ns:Ustrd', './ns:RmtInf/ns:Ustrd',
@ -83,7 +97,7 @@ class CamtParser(models.AbstractModel):
) )
# eref # eref
self.add_value_from_node( self.add_value_from_node(
ns, node, [
node, [
'./ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref', './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref',
'./ns:Refs/ns:EndToEndId', './ns:Refs/ns:EndToEndId',
], ],
@ -94,62 +108,59 @@ class CamtParser(models.AbstractModel):
transaction['amount'] = amount transaction['amount'] = amount
# remote party values # remote party values
party_type = 'Dbtr' party_type = 'Dbtr'
party_type_node = node.xpath(
'../../ns:CdtDbtInd', namespaces={'ns': ns})
party_type_node = self.xpath(node, '../../ns:CdtDbtInd')
if party_type_node and party_type_node[0].text != 'CRDT': if party_type_node and party_type_node[0].text != 'CRDT':
party_type = 'Cdtr' party_type = 'Cdtr'
party_node = node.xpath(
'./ns:RltdPties/ns:%s' % party_type, namespaces={'ns': ns})
party_node = self.xpath(node, './ns:RltdPties/ns:%s' % party_type)
if party_node: if party_node:
self.add_value_from_node( self.add_value_from_node(
ns, party_node[0], './ns:Nm', transaction, 'remote_owner')
party_node[0], './ns:Nm', transaction, 'remote_owner'
)
self.add_value_from_node( self.add_value_from_node(
ns, party_node[0], './ns:PstlAdr/ns:Ctry', transaction,
party_node[0], './ns:PstlAdr/ns:Ctry', transaction,
'remote_owner_country' 'remote_owner_country'
) )
address_node = party_node[0].xpath(
'./ns:PstlAdr/ns:AdrLine', namespaces={'ns': ns})
address_node = self.xpath(
party_node[0], './ns:PstlAdr/ns:AdrLine'
)
if address_node: if address_node:
transaction.remote_owner_address = [address_node[0].text] transaction.remote_owner_address = [address_node[0].text]
# Get remote_account from iban or from domestic account: # Get remote_account from iban or from domestic account:
account_node = node.xpath(
'./ns:RltdPties/ns:%sAcct/ns:Id' % party_type,
namespaces={'ns': ns}
account_node = self.xpath(
node, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type
) )
if account_node: if account_node:
iban_node = account_node[0].xpath(
'./ns:IBAN', namespaces={'ns': ns})
iban_node = self.xpath(account_node[0], './ns:IBAN')
if iban_node: if iban_node:
transaction.remote_account = iban_node[0].text transaction.remote_account = iban_node[0].text
bic_node = node.xpath(
'./ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type,
namespaces={'ns': ns}
bic_node = self.xpath(
node,
'./ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC'
% party_type
) )
if bic_node: if bic_node:
transaction.remote_bank_bic = bic_node[0].text transaction.remote_bank_bic = bic_node[0].text
else: else:
self.add_value_from_node( self.add_value_from_node(
ns, account_node[0], './ns:Othr/ns:Id', transaction,
account_node[0], './ns:Othr/ns:Id', transaction,
'remote_account' 'remote_account'
) )
def parse_entry(self, ns, node, transaction):
"""Parse an Ntry node and yield transactions."""
def parse_transaction(self, node, transaction):
"""Parse transaction (entry) node."""
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction,
node, './ns:BkTxCd/ns:Prtry/ns:Cd', transaction,
'transfer_type' 'transfer_type'
) )
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:BookgDt/ns:Dt', transaction, 'date')
node, './ns:BookgDt/ns:Dt', transaction, 'date')
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:BookgDt/ns:Dt', transaction, 'execution_date')
node, './ns:BookgDt/ns:Dt', transaction, 'execution_date')
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:ValDt/ns:Dt', transaction, 'value_date')
amount = self.parse_amount(ns, node)
if amount != 0.0:
transaction['amount'] = amount
node, './ns:ValDt/ns:Dt', transaction, 'value_date')
transaction.transferred_amount = self.parse_amount(node)
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:AddtlNtryInf', transaction, 'name')
node, './ns:AddtlNtryInf', transaction, 'name')
self.add_value_from_node( self.add_value_from_node(
ns, node, [ ns, node, [
'./ns:NtryDtls/ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref', './ns:NtryDtls/ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref',
@ -226,28 +237,27 @@ class CamtParser(models.AbstractModel):
balance = self.parse_amount(nodes[-1]) balance = self.parse_amount(nodes[-1])
return balance return balance
def parse_statement(self, ns, node):
def parse_statement(self, node):
"""Parse a single Stmt node.""" """Parse a single Stmt node."""
statement = BankStatement() statement = BankStatement()
self.add_value_from_node( self.add_value_from_node(
ns, node, [
node, [
'./ns:Acct/ns:Id/ns:IBAN', './ns:Acct/ns:Id/ns:IBAN',
'./ns:Acct/ns:Id/ns:Othr/ns:Id', './ns:Acct/ns:Id/ns:Othr/ns:Id',
], statement, 'local_account' ], statement, 'local_account'
) )
self.add_value_from_node(node, './ns:Id', statement, 'statement_id')
self.add_value_from_node( self.add_value_from_node(
ns, node, './ns:Id', statement, 'statement_id')
self.add_value_from_node(
ns, node, './ns:Acct/ns:Ccy', statement, 'local_currency')
node, './ns:Acct/ns:Ccy', statement, 'local_currency')
statement.start_balance = self.get_start_balance(node) statement.start_balance = self.get_start_balance(node)
statement.end_balance = self.get_end_balance(node) statement.end_balance = self.get_end_balance(node)
transaction_nodes = node.xpath('./ns:Ntry', namespaces={'ns': ns})
transaction_nodes = self.xpath(node, './ns:Ntry')
total_amount = 0 total_amount = 0
for entry_node in transaction_nodes: for entry_node in transaction_nodes:
transaction = statement.create_transaction() transaction = statement.create_transaction()
total_amount += transaction['transferred_amount']
transaction.data = etree.tostring(entry_node) transaction.data = etree.tostring(entry_node)
self.parse_transaction(ns, entry_node, transaction)
self.parse_transaction(entry_node, transaction)
total_amount += transaction.transferred_amount
if statement['transactions']: if statement['transactions']:
execution_date = statement['transactions'][0].execution_date[:10] execution_date = statement['transactions'][0].execution_date[:10]
statement.date = datetime.strptime(execution_date, "%Y-%m-%d") statement.date = datetime.strptime(execution_date, "%Y-%m-%d")
@ -266,15 +276,15 @@ class CamtParser(models.AbstractModel):
) )
return statement return statement
def check_version(self, ns, root):
def check_version(self, root):
"""Validate validity of camt file.""" """Validate validity of camt file."""
# Check wether it is camt at all: # Check wether it is camt at all:
re_camt = re.compile( re_camt = re.compile(
r'(^urn:iso:std:iso:20022:tech:xsd:camt.' r'(^urn:iso:std:iso:20022:tech:xsd:camt.'
r'|^ISO:camt.)' r'|^ISO:camt.)'
) )
if not re_camt.search(ns):
raise ValueError('no camt: ' + ns)
if not re_camt.search(self.namespace):
raise ValueError('no camt: ' + self.namespace)
# Check wether version 052 or 053: # Check wether version 052 or 053:
re_camt_version = re.compile( re_camt_version = re.compile(
r'(^urn:iso:std:iso:20022:tech:xsd:camt.053.' r'(^urn:iso:std:iso:20022:tech:xsd:camt.053.'
@ -282,10 +292,10 @@ class CamtParser(models.AbstractModel):
r'|^ISO:camt.053.' r'|^ISO:camt.053.'
r'|^ISO:camt.052.)' r'|^ISO:camt.052.)'
) )
if not re_camt_version.search(ns):
raise ValueError('no camt 052 or 053: ' + ns)
if not re_camt_version.search(self.namespace):
raise ValueError('no camt 052 or 053: ' + self.namespace)
# Check GrpHdr element: # Check GrpHdr element:
root_0_0 = root[0][0].tag[len(ns) + 2:] # strip namespace
root_0_0 = root[0][0].tag[len(self.namespace) + 2:] # strip namespace
if root_0_0 != 'GrpHdr': if root_0_0 != 'GrpHdr':
raise ValueError('expected GrpHdr, got: ' + root_0_0) raise ValueError('expected GrpHdr, got: ' + root_0_0)
@ -301,11 +311,11 @@ class CamtParser(models.AbstractModel):
if root is None: if root is None:
raise ValueError( raise ValueError(
'Not a valid xml file, or not an xml file at all.') 'Not a valid xml file, or not an xml file at all.')
ns = root.tag[1:root.tag.index("}")]
self.check_version(ns, root)
self.namespace = root.tag[1:root.tag.index("}")]
self.check_version(root)
statements = [] statements = []
for node in root[0][1:]: for node in root[0][1:]:
statement = self.parse_statement(ns, node)
statement = self.parse_statement(node)
if len(statement['transactions']): if len(statement['transactions']):
statements.append(statement) statements.append(statement)
return statements return statements
Loading…
Cancel
Save