You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

229 lines
8.9 KiB

  1. # Copyright 2020 Florent de Labarre
  2. # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).
  3. import requests
  4. import json
  5. import base64
  6. import time
  7. import pytz
  8. import re
  9. from datetime import datetime
  10. from odoo import api, fields, models, _
  11. from odoo.exceptions import UserError
  12. from dateutil.relativedelta import relativedelta
  13. from odoo.addons.base.models.res_bank import sanitize_account_number
  14. PONTO_ENDPOINT = 'https://api.myponto.com'
  15. class OnlineBankStatementProviderPonto(models.Model):
  16. _inherit = 'online.bank.statement.provider'
  17. ponto_token = fields.Char(readonly=True)
  18. ponto_token_expiration = fields.Datetime(readonly=True)
  19. ponto_last_identifier = fields.Char(readonly=True)
  20. def ponto_reset_last_identifier(self):
  21. self.write({'ponto_last_identifier': False})
  22. @api.model
  23. def _get_available_services(self):
  24. return super()._get_available_services() + [
  25. ('ponto', 'MyPonto.com'),
  26. ]
  27. def _obtain_statement_data(self, date_since, date_until):
  28. self.ensure_one()
  29. if self.service != 'ponto':
  30. return super()._obtain_statement_data(
  31. date_since,
  32. date_until,
  33. )
  34. return self._ponto_obtain_statement_data(date_since, date_until)
  35. def _get_statement_date(self, date_since, date_until):
  36. self.ensure_one()
  37. if self.service != 'ponto':
  38. return super()._get_statement_date(
  39. date_since,
  40. date_until,
  41. )
  42. return date_since.astimezone(pytz.timezone('Europe/Paris')).date()
  43. #########
  44. # ponto #
  45. #########
  46. def _ponto_header_token(self):
  47. self.ensure_one()
  48. if self.username and self.password:
  49. login = '%s:%s' % (self.username, self.password)
  50. login = base64.b64encode(login.encode('UTF-8')).decode('UTF-8')
  51. return {'Content-Type': 'application/x-www-form-urlencoded',
  52. 'Accept': 'application/json',
  53. 'Authorization': 'Basic %s' % login, }
  54. raise UserError(_('Please fill login and key.'))
  55. def _ponto_header(self):
  56. self.ensure_one()
  57. if not self.ponto_token \
  58. or not self.ponto_token_expiration \
  59. or self.ponto_token_expiration <= fields.Datetime.now():
  60. url = PONTO_ENDPOINT + '/oauth2/token'
  61. response = requests.post(url, verify=False,
  62. params={'grant_type': 'client_credentials'},
  63. headers=self._ponto_header_token())
  64. if response.status_code == 200:
  65. data = json.loads(response.text)
  66. access_token = data.get('access_token', False)
  67. if not access_token:
  68. raise UserError(_('Ponto : no token'))
  69. else:
  70. self.sudo().ponto_token = access_token
  71. expiration_date = fields.Datetime.now() + relativedelta(
  72. seconds=data.get('expires_in', False))
  73. self.sudo().ponto_token_expiration = expiration_date
  74. else:
  75. raise UserError(_('%s \n\n %s') % (response.status_code, response.text))
  76. return {'Accept': 'application/json',
  77. 'Authorization': 'Bearer %s' % self.ponto_token, }
  78. def _ponto_get_account_ids(self):
  79. url = PONTO_ENDPOINT + '/accounts'
  80. response = requests.get(url, verify=False, params={'limit': 100},
  81. headers=self._ponto_header())
  82. if response.status_code == 200:
  83. data = json.loads(response.text)
  84. res = {}
  85. for account in data.get('data', []):
  86. iban = sanitize_account_number(
  87. account.get('attributes', {}).get('reference', ''))
  88. res[iban] = account.get('id')
  89. return res
  90. raise UserError(_('%s \n\n %s') % (response.status_code, response.text))
  91. def _ponto_synchronisation(self, account_id):
  92. url = PONTO_ENDPOINT + '/synchronizations'
  93. data = {'data': {
  94. 'type': 'synchronization',
  95. 'attributes': {
  96. 'resourceType': 'account',
  97. 'resourceId': account_id,
  98. 'subtype': 'accountTransactions'
  99. }
  100. }}
  101. response = requests.post(url, verify=False,
  102. headers=self._ponto_header(),
  103. json=data)
  104. if response.status_code in (200, 201, 400):
  105. data = json.loads(response.text)
  106. sync_id = data.get('attributes', {}).get('resourceId', False)
  107. else:
  108. raise UserError(_('Error during Create Synchronisation %s \n\n %s') % (
  109. response.status_code, response.text))
  110. # Check synchronisation
  111. if not sync_id:
  112. return
  113. url = PONTO_ENDPOINT + '/synchronizations/' + sync_id
  114. number = 0
  115. while number == 100:
  116. number += 1
  117. response = requests.get(url, verify=False, headers=self._ponto_header())
  118. if response.status_code == 200:
  119. data = json.loads(response.text)
  120. status = data.get('status', {})
  121. if status in ('success', 'error'):
  122. return
  123. time.sleep(4)
  124. def _ponto_get_transaction(self, account_id, date_since, date_until):
  125. page_url = PONTO_ENDPOINT + '/accounts/' + account_id + '/transactions'
  126. params = {'limit': 100}
  127. page_next = True
  128. last_identifier = self.ponto_last_identifier
  129. if last_identifier:
  130. params['before'] = last_identifier
  131. page_next = False
  132. transaction_lines = []
  133. latest_identifier = False
  134. while page_url:
  135. response = requests.get(page_url, verify=False, params=params,
  136. headers=self._ponto_header())
  137. if response.status_code == 200:
  138. if params.get('before'):
  139. params.pop('before')
  140. data = json.loads(response.text)
  141. links = data.get('links', {})
  142. if page_next:
  143. page_url = links.get('next', False)
  144. else:
  145. page_url = links.get('prev', False)
  146. transactions = data.get('data', [])
  147. if transactions:
  148. current_transactions = []
  149. for transaction in transactions:
  150. date = self._ponto_date_from_string(
  151. transaction.get('attributes', {}).get('executionDate'))
  152. if date_since <= date < date_until:
  153. current_transactions.append(transaction)
  154. if current_transactions:
  155. if not page_next or (page_next and not latest_identifier):
  156. latest_identifier = current_transactions[0].get('id')
  157. transaction_lines.extend(current_transactions)
  158. else:
  159. raise UserError(
  160. _('Error during get transaction.\n\n%s \n\n %s') % (
  161. response.status_code, response.text))
  162. if latest_identifier:
  163. self.ponto_last_identifier = latest_identifier
  164. return transaction_lines
  165. def _ponto_date_from_string(self, date_str):
  166. return datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%fZ')
  167. def _ponto_obtain_statement_data(self, date_since, date_until):
  168. self.ensure_one()
  169. account_ids = self._ponto_get_account_ids()
  170. journal = self.journal_id
  171. iban = self.account_number
  172. account_id = account_ids.get(iban)
  173. if not account_id:
  174. raise UserError(
  175. _('Ponto : wrong configuration, unknow account %s')
  176. % journal.bank_account_id.acc_number)
  177. self._ponto_synchronisation(account_id)
  178. transaction_lines = self._ponto_get_transaction(account_id,
  179. date_since,
  180. date_until)
  181. new_transactions = []
  182. sequence = 0
  183. for transaction in transaction_lines:
  184. sequence += 1
  185. attributes = transaction.get('attributes', {})
  186. ref = '%s %s' % (
  187. attributes.get('description'),
  188. attributes.get('counterpartName'))
  189. date = self._ponto_date_from_string(attributes.get('executionDate'))
  190. vals_line = {
  191. 'sequence': sequence,
  192. 'date': date,
  193. 'name': re.sub(' +', ' ', ref) or '/',
  194. 'ref': attributes.get('remittanceInformation', ''),
  195. 'unique_import_id': transaction['id'],
  196. 'amount': attributes['amount'],
  197. }
  198. new_transactions.append(vals_line)
  199. if new_transactions:
  200. return new_transactions, {}
  201. return