You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

234 lines
8.9 KiB

  1. # -*- coding: utf-8 -*-
  2. """Class to parse camt files."""
  3. # © 2013-2016 Therp BV <http://therp.nl>
  4. # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
  5. import re
  6. from lxml import etree
  7. class CamtParser(object):
  8. """Parser for camt bank statement import files."""
  9. def parse_amount(self, ns, node):
  10. """Parse element that contains Amount and CreditDebitIndicator."""
  11. if node is None:
  12. return 0.0
  13. sign = 1
  14. amount = 0.0
  15. sign_node = node.xpath('ns:CdtDbtInd', namespaces={'ns': ns})
  16. if sign_node and sign_node[0].text == 'DBIT':
  17. sign = -1
  18. amount_node = node.xpath('ns:Amt', namespaces={'ns': ns})
  19. if amount_node:
  20. amount = sign * float(amount_node[0].text)
  21. return amount
  22. def add_value_from_node(
  23. self, ns, node, xpath_str, obj, attr_name, join_str=None):
  24. """Add value to object from first or all nodes found with xpath.
  25. If xpath_str is a list (or iterable), it will be seen as a series
  26. of search path's in order of preference. The first item that results
  27. in a found node will be used to set a value."""
  28. if not isinstance(xpath_str, (list, tuple)):
  29. xpath_str = [xpath_str]
  30. for search_str in xpath_str:
  31. found_node = node.xpath(search_str, namespaces={'ns': ns})
  32. if found_node:
  33. if join_str is None:
  34. attr_value = found_node[0].text
  35. else:
  36. attr_value = join_str.join([x.text for x in found_node])
  37. obj[attr_name] = attr_value
  38. break
  39. def parse_transaction_details(self, ns, node, transaction):
  40. """Parse transaction details (message, party, account...)."""
  41. # message
  42. self.add_value_from_node(
  43. ns, node, [
  44. './ns:RmtInf/ns:Ustrd',
  45. './ns:AddtlNtryInf',
  46. './ns:Refs/ns:InstrId',
  47. ], transaction, 'note', join_str='\n')
  48. # name
  49. self.add_value_from_node(
  50. ns, node, [
  51. './ns:AddtlTxInf',
  52. ], transaction, 'name', join_str='\n')
  53. # eref
  54. self.add_value_from_node(
  55. ns, node, [
  56. './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref',
  57. './ns:Refs/ns:EndToEndId',
  58. ],
  59. transaction, 'ref'
  60. )
  61. # unique_import_id
  62. self.add_value_from_node(
  63. ns, node, [
  64. './ns:Refs/ns:AcctSvcrRef',
  65. ],
  66. transaction, 'unique_import_id'
  67. )
  68. # remote party values
  69. party_type = 'Dbtr'
  70. party_type_node = node.xpath(
  71. '../../ns:CdtDbtInd', namespaces={'ns': ns})
  72. if party_type_node and party_type_node[0].text != 'CRDT':
  73. party_type = 'Cdtr'
  74. party_node = node.xpath(
  75. './ns:RltdPties/ns:%s' % party_type, namespaces={'ns': ns})
  76. if party_node:
  77. self.add_value_from_node(
  78. ns, party_node[0], './ns:Nm', transaction, 'partner_name')
  79. # Get remote_account from iban or from domestic account:
  80. account_node = node.xpath(
  81. './ns:RltdPties/ns:%sAcct/ns:Id' % party_type,
  82. namespaces={'ns': ns}
  83. )
  84. if account_node:
  85. iban_node = account_node[0].xpath(
  86. './ns:IBAN', namespaces={'ns': ns})
  87. if iban_node:
  88. transaction['account_number'] = iban_node[0].text
  89. else:
  90. self.add_value_from_node(
  91. ns, account_node[0], './ns:Othr/ns:Id', transaction,
  92. 'account_number'
  93. )
  94. def parse_transaction(self, ns, node):
  95. """Parse transaction (entry) node."""
  96. transaction = {}
  97. self.add_value_from_node(
  98. ns, node, './ns:BookgDt/ns:Dt', transaction, 'date')
  99. transaction['amount'] = self.parse_amount(ns, node)
  100. details_node = node.xpath(
  101. './ns:NtryDtls/ns:TxDtls', namespaces={'ns': ns})
  102. if details_node:
  103. self.parse_transaction_details(ns, details_node[0], transaction)
  104. if not transaction.get('name'):
  105. self.add_value_from_node(
  106. ns, node, './ns:AddtlNtryInf', transaction, 'name')
  107. if not transaction.get('name'):
  108. transaction['name'] = '/'
  109. if not transaction.get('ref'):
  110. self.add_value_from_node(
  111. ns, node, [
  112. './ns:NtryDtls/ns:Btch/ns:PmtInfId',
  113. ],
  114. transaction, 'ref'
  115. )
  116. return transaction
  117. def get_balance_amounts(self, ns, node):
  118. """Return opening and closing balance.
  119. Depending on kind of balance and statement, the balance might be in a
  120. different kind of node:
  121. OPBD = OpeningBalance
  122. PRCD = PreviousClosingBalance
  123. ITBD = InterimBalance (first ITBD is start-, second is end-balance)
  124. CLBD = ClosingBalance
  125. """
  126. start_balance_node = None
  127. end_balance_node = None
  128. for node_name in ['OPBD', 'PRCD', 'CLBD', 'ITBD']:
  129. code_expr = (
  130. './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' %
  131. node_name
  132. )
  133. balance_node = node.xpath(code_expr, namespaces={'ns': ns})
  134. if balance_node:
  135. if node_name in ['OPBD', 'PRCD']:
  136. start_balance_node = balance_node[0]
  137. elif node_name == 'CLBD':
  138. end_balance_node = balance_node[0]
  139. else:
  140. if not start_balance_node:
  141. start_balance_node = balance_node[0]
  142. if not end_balance_node:
  143. end_balance_node = balance_node[-1]
  144. return (
  145. self.parse_amount(ns, start_balance_node),
  146. self.parse_amount(ns, end_balance_node)
  147. )
  148. def parse_statement(self, ns, node):
  149. """Parse a single Stmt node."""
  150. result = {}
  151. self.add_value_from_node(
  152. ns, node, [
  153. './ns:Acct/ns:Id/ns:IBAN',
  154. './ns:Acct/ns:Id/ns:Othr/ns:Id',
  155. ], result, 'account_number'
  156. )
  157. self.add_value_from_node(
  158. ns, node, './ns:Id', result, 'name')
  159. self.add_value_from_node(
  160. ns, node, './ns:CreDtTm', result, 'date')
  161. result['date'] = result['date'].split('T')[0]
  162. self.add_value_from_node(
  163. ns, node, './ns:Acct/ns:Ccy', result, 'currency')
  164. result['balance_start'], result['balance_end_real'] = (
  165. self.get_balance_amounts(ns, node))
  166. transaction_nodes = node.xpath('./ns:Ntry', namespaces={'ns': ns})
  167. result['transactions'] = []
  168. for entry_node in transaction_nodes:
  169. transaction = self.parse_transaction(ns, entry_node)
  170. if transaction:
  171. result['transactions'].append(transaction)
  172. return result
  173. def check_version(self, ns, root):
  174. """Validate validity of camt file."""
  175. # Check wether it is camt at all:
  176. re_camt = re.compile(
  177. r'(^urn:iso:std:iso:20022:tech:xsd:camt.'
  178. r'|^ISO:camt.)'
  179. )
  180. if not re_camt.search(ns):
  181. raise ValueError('no camt: ' + ns)
  182. # Check wether version 052 or 053:
  183. re_camt_version = re.compile(
  184. r'(^urn:iso:std:iso:20022:tech:xsd:camt.053.'
  185. r'|^urn:iso:std:iso:20022:tech:xsd:camt.052.'
  186. r'|^ISO:camt.053.'
  187. r'|^ISO:camt.052.)'
  188. )
  189. if not re_camt_version.search(ns):
  190. raise ValueError('no camt 052 or 053: ' + ns)
  191. # Check GrpHdr element:
  192. root_0_0 = root[0][0].tag[len(ns) + 2:] # strip namespace
  193. if root_0_0 != 'GrpHdr':
  194. raise ValueError('expected GrpHdr, got: ' + root_0_0)
  195. def parse(self, data):
  196. """Parse a camt.052 or camt.053 file."""
  197. try:
  198. root = etree.fromstring(
  199. data, parser=etree.XMLParser(recover=True))
  200. except etree.XMLSyntaxError:
  201. # ABNAmro is known to mix up encodings
  202. root = etree.fromstring(
  203. data.decode('iso-8859-15').encode('utf-8'))
  204. if root is None:
  205. raise ValueError(
  206. 'Not a valid xml file, or not an xml file at all.')
  207. ns = root.tag[1:root.tag.index("}")]
  208. self.check_version(ns, root)
  209. statements = []
  210. currency = None
  211. account_number = None
  212. for node in root[0][1:]:
  213. statement = self.parse_statement(ns, node)
  214. if len(statement['transactions']):
  215. if 'currency' in statement:
  216. currency = statement.pop('currency')
  217. if 'account_number' in statement:
  218. account_number = statement.pop('account_number')
  219. statements.append(statement)
  220. return currency, account_number, statements