Browse Source

[IMP] *_online_ponto: increase test coverage.

12.0
Ronald Portier (Therp BV) 2 years ago
parent
commit
9d1a2f3995
No known key found for this signature in database GPG Key ID: A181F8124D7101D3
  1. 4
      account_bank_statement_import_online_ponto/models/online_bank_statement_provider_ponto.py
  2. 46
      account_bank_statement_import_online_ponto/models/ponto_interface.py
  3. 25
      account_bank_statement_import_online_ponto/tests/test_account_statement_import_online_ponto.py
  4. 26
      account_bank_statement_import_online_ponto/tests/test_ponto_interface.py

4
account_bank_statement_import_online_ponto/models/online_bank_statement_provider_ponto.py

@ -99,9 +99,7 @@ class OnlineBankStatementProviderPonto(models.Model):
sequence
)
new_transactions.append(vals_line)
if new_transactions:
return new_transactions, {}
return
return new_transactions, {}
def _ponto_get_transaction_vals(self, transaction, sequence):
"""Translate information from Ponto to statement line vals."""

46
account_bank_statement_import_online_ponto/models/ponto_interface.py

@ -105,37 +105,45 @@ class PontoInterface(models.AbstractModel):
if response.status_code == 400:
# Probably synchronization recently done already.
_logger.debug(
_("Syncronization request rejected: %s"),
_("Synchronization request rejected: %s"),
response.text
)
return
data = self._get_response_data(response)
sync_id = data.get("attributes", {}).get("resourceId", False)
# Check synchronisation
if not sync_id:
return
raise UserError(
_("Ponto : no resourceId in synchronization data %s") % data
)
# Poll synchronization during 400 seconds for completion.
url = PONTO_ENDPOINT + "/synchronizations/" + sync_id
number = 0
while number <= 10:
while number < 10:
number += 1
response = requests.get(url, headers=self._get_request_headers(access_data))
if response.status_code == 200:
data = json.loads(response.text)
status = data.get("status", {})
if status in ("success", "error"):
if status == "error":
_logger.debug(
_("Syncronization was succesfully completed")
)
else:
_logger.debug(
_("Syncronization had an error: %s"),
response.text
)
return
if self._synchronization_done(access_data, url):
break
time.sleep(40)
def _synchronization_done(self, access_data, url):
"""Check wether requested synchronization done."""
response = requests.get(url, headers=self._get_request_headers(access_data))
if response.status_code != 200:
return False
data = json.loads(response.text)
status = data.get("status", {})
if status not in ("success", "error"):
return False
if status == "error":
_logger.debug(
_("Synchronization was succesfully completed")
)
else:
_logger.debug(
_("Synchronization had an error: %s"),
response.text
)
return True
def _get_transactions(self, access_data, last_identifier):
"""Get transactions from ponto, using last_identifier as pointer.

25
account_bank_statement_import_online_ponto/tests/test_account_statement_import_online_ponto.py

@ -229,17 +229,28 @@ class TestBankAccountStatementImportOnlinePonto(common.TransactionCase):
def test_ponto_buffer_purge(self):
"""Create some old buffer records and test purging them."""
buffer_model = self.env["ponto.buffer"]
buffer_model.sudo()._store_transactions(self.provider, THREE_TRANSACTIONS)
# As all transactions have a different date, they will be in separate buffers.
buffers = buffer_model.search([("provider_id", "=", self.provider.id)])
self.assertEqual(len(buffers), 3)
# Non ponto providers should not affect buffers.
self._expect_purge_result(service="dummy", retain_days=15, expected_length=3)
# If retain date not filled, buffers should not be purged.
self._expect_purge_result(expected_length=3)
# If retain date filled, buffers should be purged.
self._expect_purge_result(retain_days=15)
def _expect_purge_result(self, service="ponto", retain_days=0, expected_length=0):
"""Check result for purge in different scenario's."""
buffer_model = self.env["ponto.buffer"]
self.provider.write(
{
"ponto_buffer_retain_days": 31,
"active": True,
"ponto_buffer_retain_days": retain_days,
"service": service,
}
)
buffer_model = self.env["ponto.buffer"]
buffer_model.sudo()._store_transactions(self.provider, THREE_TRANSACTIONS)
buffers = buffer_model.search([("provider_id", "=", self.provider.id)])
# As all transactions have a different date, they will be in separate buffers.
self.assertEqual(len(buffers), 3)
self.provider._ponto_buffer_purge()
buffers = buffer_model.search([("provider_id", "=", self.provider.id)])
self.assertFalse(bool(buffers))
self.assertEqual(len(buffers), expected_length)

26
account_bank_statement_import_online_ponto/tests/test_ponto_interface.py

@ -91,6 +91,32 @@ class TestPontoInterface(common.TransactionCase):
interface_model = self.env["ponto.interface"]
interface_model._ponto_synchronisation(access_data)
@patch("requests.get")
def test_synchronization_done(self, requests_get):
"""Test getting account data for Ponto access."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = json.dumps({"status": "success"})
requests_get.return_value = mock_response
# Succesfull sync.
self._check_synchronization_done(True)
# Error in sync.
mock_response.text = json.dumps({"status": "error"})
self._check_synchronization_done(True)
# Unexpected error in sync.
mock_response.status_code = 404
self._check_synchronization_done(False)
def _check_synchronization_done(self, expected_result):
"""Check result for synchronization with current mock."""
interface_model = self.env["ponto.interface"]
access_data = self._get_access_dict()
synchronization_done = interface_model._synchronization_done(
access_data,
"https//does.not.matter.com/synchronization"
)
self.assertEqual(synchronization_done, expected_result)
@patch("requests.get")
def test_get_transactions(self, requests_get):
"""Test getting transactions from Ponto."""

Loading…
Cancel
Save