banking-addons-team team mailing list archive
banking-addons-team team
Mailing list archive
Message #00971
[Merge] lp:~therp-nl/banking-addons/6.1-add_camt_import into lp:banking-addons/6.1
Stefan Rijnhart (Therp) has proposed merging lp:~therp-nl/banking-addons/6.1-add_camt_import into lp:banking-addons/6.1.
Commit message:
[ADD] Support for SEPA CAMT.053 bank statements
Requested reviews:
Banking Addons Core Editors (banking-addons-team)
For more details, see:
Same as
Your team Banking Addons Core Editors is requested to review the proposed merge of lp:~therp-nl/banking-addons/6.1-add_camt_import into lp:banking-addons/6.1.
=== modified file 'account_banking/parsers/'
--- account_banking/parsers/ 2012-01-17 08:48:10 +0000
+++ account_banking/parsers/ 2013-10-11 12:39:01 +0000
@@ -19,6 +19,7 @@
+import re
from tools.translate import _
class mem_bank_statement(object):
@@ -34,7 +35,7 @@
# Lock attributes to enable parsers to trigger non-conformity faults
__slots__ = [
'start_balance','end_balance', 'date', 'local_account',
- 'local_currency', 'id', 'statements'
+ 'local_currency', 'id', 'transactions'
def __init__(self, *args, **kwargs):
super(mem_bank_statement, self).__init__(*args, **kwargs)
@@ -353,6 +354,33 @@
name = "%s-%d" % (base, suffix)
return name
+ def get_unique_account_identifier(self, cr, account):
+ """
+ Get an identifier for a local bank account, based on the last
+ characters of the account number with minimum length 3.
+ The identifier should be unique amongst the company accounts
+ Presumably, the bank account is one of the company accounts
+ itself but importing bank statements for non-company accounts
+ is not prevented anywhere else in the system so the 'account'
+ param being a company account is not enforced here either.
+ """
+ def normalize(account_no):
+ return re.sub('\s', '', account_no)
+ account = normalize(account)
+ cr.execute(
+ """SELECT acc_number FROM res_partner_bank
+ WHERE company_id IS NOT NULL""")
+ accounts = [normalize(row[0]) for row in cr.fetchall()]
+ tail_length = 3
+ while tail_length <= len(account):
+ tail = account[-tail_length:]
+ if len([acc for acc in accounts if acc.endswith(tail)]) < 2:
+ return tail
+ tail_length += 1
+ return account
def parse(self, cr, data):
Parse data.
=== added directory 'account_banking_camt'
=== added file 'account_banking_camt/'
--- account_banking_camt/ 1970-01-01 00:00:00 +0000
+++ account_banking_camt/ 2013-10-11 12:39:01 +0000
@@ -0,0 +1,1 @@
+import camt
=== added file 'account_banking_camt/'
--- account_banking_camt/ 1970-01-01 00:00:00 +0000
+++ account_banking_camt/ 2013-10-11 12:39:01 +0000
@@ -0,0 +1,33 @@
+# Copyright (C) 2013 Therp BV (<>)
+# All Rights Reserved
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <>.
+ 'name': 'CAMT Format Bank Statements Import',
+ 'version': '0.1',
+ 'license': 'AGPL-3',
+ 'author': 'Therp BV',
+ 'website': '',
+ 'category': 'Banking addons',
+ 'depends': ['account_banking'],
+ 'description': '''
+Module to import SEPA CAMT.053 Format bank statement files. Based
+on the Banking addons framework.
+ ''',
+ 'installable': True,
=== added file 'account_banking_camt/'
--- account_banking_camt/ 1970-01-01 00:00:00 +0000
+++ account_banking_camt/ 2013-10-11 12:39:01 +0000
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+# Copyright (C) 2013 Therp BV (<>)
+# All Rights Reserved
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <>.
+from lxml import etree
+from openerp.osv.orm import except_orm
+from account_banking.parsers import models
+from account_banking.parsers.convert import str2date
+bt = models.mem_bank_transaction
+class transaction(models.mem_bank_transaction):
+ def __init__(self, values, *args, **kwargs):
+ super(transaction, self).__init__(*args, **kwargs)
+ for attr in values:
+ setattr(self, attr, values[attr])
+ def is_valid(self):
+ return not self.error_message
+class parser(models.parser):
+ code = 'CAMT'
+ country_code = 'NL'
+ name = 'Generic CAMT Format'
+ doc = '''\
+CAMT Format parser
+ def tag(self, node):
+ """
+ Return the tag of a node, stripped from its namespace
+ """
+ return node.tag[len(self.ns):]
+ def assert_tag(self, node, expected):
+ """
+ Get node's stripped tag and compare with expected
+ """
+ assert self.tag(node) == expected, (
+ "Expected tag '%s', got '%s' instead" %
+ (self.tag(node), expected))
+ def xpath(self, node, expr):
+ """
+ Wrap namespaces argument into call to Element.xpath():
+ self.xpath(node, './ns:Acct/ns:Id')
+ """
+ return node.xpath(expr, namespaces={'ns': self.ns[1:-1]})
+ def find(self, node, expr):
+ """
+ Like xpath(), but return first result if any or else False
+ Return None to test nodes for being truesy
+ """
+ result = node.xpath(expr, namespaces={'ns': self.ns[1:-1]})
+ if result:
+ return result[0]
+ return None
+ def get_balance_type_node(self, node, balance_type):
+ """
+ :param node: BkToCstmrStmt/Stmt/Bal node
+ :param balance type: one of 'OPBD', 'PRCD', 'ITBD', 'CLBD'
+ """
+ code_expr = './ns:Bal/ns:Tp/ns:CdOrPrtry/ns:Cd[text()="%s"]/../../..' % balance_type
+ return self.xpath(node, code_expr)
+ def parse_amount(self, node):
+ """
+ Parse an element that contains both Amount and CreditDebitIndicator
+ :return: signed amount
+ :returntype: float
+ """
+ sign = -1 if node.find(self.ns + 'CdtDbtInd').text == 'DBIT' else 1
+ return sign * float(node.find(self.ns + 'Amt').text)
+ def get_start_balance(self, node):
+ """
+ Find the (only) balance node with code OpeningBalance, or
+ the only one with code 'PreviousClosingBalance'
+ or the first balance node with code InterimBalance in
+ the case of preceeding pagination.
+ :param node: BkToCstmrStmt/Stmt/Bal node
+ """
+ nodes = (
+ self.get_balance_type_node(node, 'OPBD') or
+ self.get_balance_type_node(node, 'PRCD') or
+ self.get_balance_type_node(node, 'ITBD'))
+ return self.parse_amount(nodes[0])
+ def get_end_balance(self, node):
+ """
+ Find the (only) balance node with code ClosingBalance, or
+ the second (and last) balance node with code InterimBalance in
+ the case of continued pagination.
+ :param node: BkToCstmrStmt/Stmt/Bal node
+ """
+ nodes = (
+ self.get_balance_type_node(node, 'CLBD') or
+ self.get_balance_type_node(node, 'ITBD'))
+ return self.parse_amount(nodes[-1])
+ def parse_Stmt(self, cr, node):
+ """
+ Parse a single Stmt node.
+ Be sure to craft a unique, but short enough statement identifier,
+ as it is used as the basis of the generated move lines' names
+ which overflow when using the full IBAN and CAMT statement id.
+ """
+ statement = models.mem_bank_statement()
+ statement.local_account = (
+ self.xpath(node, './ns:Acct/ns:Id/ns:IBAN')[0].text
+ if self.xpath(node, './ns:Acct/ns:Id/ns:IBAN')
+ else self.xpath(node, './ns:Acct/ns:Id/ns:Othr/ns:Id')[0].text)
+ identifier = node.find(self.ns + 'Id').text
+ if identifier.upper().startswith('CAMT053'):
+ identifier = identifier[7:]
+ = self.get_unique_statement_id(
+ cr, "%s-%s" % (
+ self.get_unique_account_identifier(
+ cr, statement.local_account),
+ identifier)
+ )
+ statement.local_currency = self.xpath(node, './ns:Acct/ns:Ccy')[0].text
+ statement.start_balance = self.get_start_balance(node)
+ statement.end_balance = self.get_end_balance(node)
+ number = 0
+ for Ntry in self.xpath(node, './ns:Ntry'):
+ transaction_detail = self.parse_Ntry(Ntry)
+ if number == 0:
+ # Take the statement date from the first transaction
+ = str2date(
+ transaction_detail['execution_date'], "%Y-%m-%d")
+ number += 1
+ transaction_detail['id'] = str(number).zfill(4)
+ statement.transactions.append(
+ transaction(transaction_detail))
+ return statement
+ def get_transfer_type(self, node):
+ """
+ Map entry descriptions to transfer types. To extend with
+ proper mapping from BkTxCd/Domn/Cd/Fmly/Cd to transfer types
+ if we can get our hands on real life samples.
+ For now, leave as a hook for bank specific overrides to map
+ properietary codes from BkTxCd/Prtry/Cd.
+ :param node: Ntry node
+ """
+ return bt.ORDER
+ def parse_Ntry(self, node):
+ """
+ :param node: Ntry node
+ """
+ entry_details = {
+ 'execution_date': self.xpath(node, './ns:BookgDt/ns:Dt')[0].text,
+ 'effective_date': self.xpath(node, './ns:ValDt/ns:Dt')[0].text,
+ 'transfer_type': self.get_transfer_type(node),
+ 'transferred_amount': self.parse_amount(node)
+ }
+ TxDtls = self.xpath(node, './ns:NtryDtls/ns:TxDtls')
+ if len(TxDtls) == 1:
+ vals = self.parse_TxDtls(TxDtls[0], entry_details)
+ else:
+ vals = entry_details
+ return vals
+ def get_party_values(self, TxDtls):
+ """
+ Determine to get either the debtor or creditor party node
+ and extract the available data from it
+ """
+ vals = {}
+ party_type = self.find(
+ TxDtls, '../../ns:CdtDbtInd').text == 'CRDT' and 'Dbtr' or 'Cdtr'
+ party_node = self.find(TxDtls, './ns:RltdPties/ns:%s' % party_type)
+ account_node = self.find(
+ TxDtls, './ns:RltdPties/ns:%sAcct/ns:Id' % party_type)
+ bic_node = self.find(
+ TxDtls,
+ './ns:RltdAgts/ns:%sAgt/ns:FinInstnId/ns:BIC' % party_type)
+ if party_node is not None:
+ name_node = self.find(party_node, './ns:Nm')
+ vals['remote_owner'] = (
+ name_node.text if name_node is not None else False)
+ country_node = self.find(party_node, './ns:PstlAdr/ns:Ctry')
+ vals['remote_owner_country'] = (
+ country_node.text if country_node is not None else False)
+ address_node = self.find(party_node, './ns:PstlAdr/ns:AdrLine')
+ if address_node is not None:
+ vals['remote_owner_address'] = [address_node.text]
+ if account_node is not None:
+ iban_node = self.find(account_node, './ns:IBAN')
+ if iban_node is not None:
+ vals['remote_account'] = iban_node.text
+ if bic_node is not None:
+ vals['remote_bank_bic'] = bic_node.text
+ else:
+ domestic_node = self.find(account_node, './ns:Othr/ns:Id')
+ vals['remote_account'] = (
+ domestic_node.text if domestic_node is not None else False)
+ return vals
+ def parse_TxDtls(self, TxDtls, entry_values):
+ """
+ Parse a single TxDtls node
+ """
+ vals = dict(entry_values)
+ unstructured = self.xpath(TxDtls, './ns:RmtInf/ns:Ustrd')
+ if unstructured:
+ vals['message'] = ' '.join([x.text for x in unstructured])
+ structured = self.find(
+ TxDtls, './ns:RmtInf/ns:Strd/ns:CdtrRefInf/ns:Ref')
+ if structured is None or not structured.text:
+ structured = self.find(TxDtls, './ns:Refs/ns:EndToEndId')
+ if structured is not None:
+ vals['reference'] = structured.text
+ else:
+ if vals.get('message'):
+ vals['reference'] = vals['message']
+ vals.update(self.get_party_values(TxDtls))
+ return vals
+ def check_version(self):
+ """
+ Sanity check the document's namespace
+ """
+ if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.'):
+ raise except_orm(
+ "Error",
+ "This does not seem to be a CAMT format bank statement.")
+ if not self.ns.startswith('{urn:iso:std:iso:20022:tech:xsd:camt.053.'):
+ raise except_orm(
+ "Error",
+ "Only CAMT.053 is supported at the moment.")
+ return True
+ def parse(self, cr, data):
+ """
+ Parse a CAMT053 XML file
+ """
+ root = etree.fromstring(data)
+ self.ns = root.tag[:root.tag.index("}") + 1]
+ self.check_version()
+ self.assert_tag(root[0][0], 'GrpHdr')
+ statements = []
+ for node in root[0][1:]:
+ statements.append(self.parse_Stmt(cr, node))
+ return statements
Follow ups