Browse Source

Merge pull request #471 from fmdl/12.0-mig-hw_telium_payment_terminal_

[MIG] 12.0 mig hw telium payment terminal
pull/481/head
Alexis de Lattre 5 years ago
committed by GitHub
parent
commit
8eafc888f1
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 119
      hw_telium_payment_terminal/README.rst
  2. 1
      hw_telium_payment_terminal/__init__.py
  3. 16
      hw_telium_payment_terminal/__manifest__.py
  4. 1
      hw_telium_payment_terminal/controllers/__init__.py
  5. 320
      hw_telium_payment_terminal/controllers/main.py
  6. BIN
      hw_telium_payment_terminal/static/description/icon.png
  7. 198
      hw_telium_payment_terminal/test-scripts/telium-test.py

119
hw_telium_payment_terminal/README.rst

@ -0,0 +1,119 @@
================================
Hardware Telium Payment Terminal
================================
.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png
:target: https://odoo-community.org/page/development-status
:alt: Beta
.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png
:target: http://www.gnu.org/licenses/agpl-3.0-standalone.html
:alt: License: AGPL-3
.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fpos-lightgray.png?logo=github
:target: https://github.com/OCA/pos/tree/12.0/hw_telium_payment_terminal
:alt: OCA/pos
.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png
:target: https://translation.odoo-community.org/projects/pos-12-0/pos-12-0-hw_telium_payment_terminal
:alt: Translate me on Weblate
|badge1| |badge2| |badge3| |badge4|
This module adds support for credit card reader and checks printers
using Telium protocol in the Point of Sale. This module is designed to
be installed on the *POSbox* (i.e. the proxy on which the USB devices
are connected) and not on the main Odoo server. On the main Odoo server,
you should install the module *pos_payment_terminal*.
This module has been developped during a POS code sprint at Akretion
France from July 7th to July 10th 2014.
**Table of contents**
.. contents::
:local:
Installation
============
Add this module in the PosBox in this folder :
/home/pi/odoo/addons
Reboot the PosBox
Configuration
=============
The configuration of the hardware is done in the configuration file of
the Odoo server of the POSbox. You can add the following entries in
the configuration file (optional).
* payment_terminal_device_name (default = /dev/ttyACM0)
* payment_terminal_device_rate (default = 9600)
The Telium protocol is used by Ingenico and Sagem payment terminals. It
is based on the Concert protocol, so it can probably work with payment
terminals from other brands. This module implements the protocol E+ (and
not the protocol E), so it requires a Telium Manager version 37783600
or superior.
Information : https://lists.launchpad.net/openerp-community/pdfcezlBjgtdJ.pdf
To get the version of the Telium Manager on an Ingenico
terminal press F > 0-TELIUM MANAGER > 2-Consultation > 4-Configuration
> 2-Software > 1-TERMINAL > On Display > Telium Manager and then read
the field *M20S*.
You will need to configure your payment terminal to accept commands
from the POS. On an Ingenico terminal press F > 0-TELIUM MANAGER >
5-Initialization > 1-Parameters > Cash Connection and then select *On*
and then *USB*. After that, you should reboot the terminal.
This module has been successfully tested with:
* Ingenico EFTSmart4S
* Ingenico EFTSmart2 2640 with Telim Manager version 37784503
* Ingenico iCT220
* Ingenico iCT250
* Ingenico i2200 cheque reader and writer
* Ingenico Desk/5000 (USB Mode)
This module requires the Python library *pycountry* version >= 16.11.08,
if you use a currency different of EUR.
To install it, run:
``sudo pip install pycountry``
Bug Tracker
===========
Bugs are tracked on `GitHub Issues <https://github.com/OCA/pos/issues>`_.
In case of trouble, please check there if your issue has already been reported.
If you spotted it first, help us smashing it by providing a detailed and welcomed
`feedback <https://github.com/OCA/pos/issues/new?body=module:%20hw_telium_payment_terminal%0Aversion:%2012.0%0A%0A**Steps%20to%20reproduce**%0A-%20...%0A%0A**Current%20behavior**%0A%0A**Expected%20behavior**>`_.
Do not contact contributors directly about support or help with technical issues.
Credits
=======
Authors
~~~~~~~
* Akretion
Contributors
~~~~~~~~~~~~
* Florent de Labarre
Maintainers
~~~~~~~~~~~
This module is maintained by the OCA.
.. image:: https://odoo-community.org/logo.png
:alt: Odoo Community Association
:target: https://odoo-community.org
OCA, or the Odoo Community Association, is a nonprofit organization whose
mission is to support the collaborative development of Odoo features and
promote its widespread use.
This module is part of the `OCA/pos <https://github.com/OCA/pos/tree/12.0/hw_telium_payment_terminal>`_ project on GitHub.
You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

1
hw_telium_payment_terminal/__init__.py

@ -0,0 +1 @@
from . import controllers

16
hw_telium_payment_terminal/__manifest__.py

@ -0,0 +1,16 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
{
'name': 'Hardware Telium Payment Terminal',
'version': '12.0.1.0.0',
'category': 'Hardware Drivers',
'license': 'AGPL-3',
'summary': 'Adds support for Payment Terminals using Telium protocol',
'author': "Akretion,Odoo Community Association (OCA)",
'website': 'http://www.github.com/OCA/pos',
'depends': ['hw_proxy'],
'external_dependencies': {
'python': ['serial', 'pycountry'],
},
'data': [],
'installable': False,
}

1
hw_telium_payment_terminal/controllers/__init__.py

@ -0,0 +1 @@
from . import main

320
hw_telium_payment_terminal/controllers/main.py

@ -0,0 +1,320 @@
import logging
import simplejson
import time
import curses.ascii
from threading import Thread, Lock
from queue import Queue
from odoo import http
from odoo.tools.config import config
from odoo.addons.hw_proxy.controllers import main as hw_proxy
logger = logging.getLogger(__name__)
try:
from serial import Serial
except (ImportError, IOError) as err:
logger.debug(err)
try:
import pycountry
EUR_CY_NBR = False
except (ImportError, IOError) as err:
logger.debug(err)
logger.warning(
'Unable to import pycountry, only EUR currency is supported')
EUR_CY_NBR = 978
class TeliumPaymentTerminalDriver(Thread):
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.lock = Lock()
self.status = {'status': 'connecting', 'messages': []}
self.device_name = config.get(
'telium_terminal_device_name', '/dev/ttyACM0')
self.device_rate = int(config.get(
'telium_terminal_device_rate', 9600))
self.serial = False
def get_status(self):
self.push_task('status')
return self.status
def set_status(self, status, message=None):
if status == self.status['status']:
if message is not None and message != self.status['messages'][-1]:
self.status['messages'].append(message)
else:
self.status['status'] = status
if message:
self.status['messages'] = [message]
else:
self.status['messages'] = []
if status == 'error' and message:
logger.error('Payment Terminal Error: ' + message)
elif status == 'disconnected' and message:
logger.warning('Disconnected Terminal: ' + message)
def lockedstart(self):
with self.lock:
if not self.is_alive():
self.daemon = True
self.start()
def push_task(self, task, data=None):
self.lockedstart()
self.queue.put((time.time(), task, data))
def serial_write(self, text):
assert isinstance(text, str), 'text must be a string'
raw = text.encode()
logger.debug("%s raw send to terminal" % raw)
logger.debug("%s send to terminal" % text)
self.serial.write(raw)
def serial_read(self, size=1):
raw = self.serial.read(size)
msg = raw.decode('ascii')
logger.debug("%s raw received from terminal" % raw)
logger.debug("%s received from terminal" % msg)
return msg
def initialize_msg(self):
max_attempt = 3
attempt_nr = 0
while attempt_nr < max_attempt:
attempt_nr += 1
self.send_one_byte_signal('ENQ')
if self.get_one_byte_answer('ACK'):
return True
else:
logger.warning("Terminal : SAME PLAYER TRY AGAIN")
self.send_one_byte_signal('EOT')
# Wait 1 sec between each attempt
time.sleep(1)
return False
def send_one_byte_signal(self, signal):
ascii_names = curses.ascii.controlnames
assert signal in ascii_names, 'Wrong signal'
char = ascii_names.index(signal)
self.serial_write(chr(char))
logger.debug('Signal %s sent to terminal' % signal)
def get_one_byte_answer(self, expected_signal):
assert isinstance(expected_signal, str), 'expected_signal must be a string'
ascii_names = curses.ascii.controlnames
one_byte_read = self.serial_read(1)
expected_char = ascii_names.index(expected_signal)
if one_byte_read == chr(expected_char):
return True
else:
return False
def _get_amount(self, payment_info_dict):
amount = payment_info_dict['amount']
cur_decimals = payment_info_dict['currency_decimals']
cur_fact = 10 ** cur_decimals
return ('%.0f' % (amount * cur_fact)).zfill(8)
def prepare_data_to_send(self, payment_info_dict):
if payment_info_dict['payment_mode'] == 'check':
payment_mode = 'C'
elif payment_info_dict['payment_mode'] == 'card':
payment_mode = '1'
else:
logger.error(
"The payment mode '%s' is not supported"
% payment_info_dict['payment_mode'])
return False
cur_iso_letter = payment_info_dict['currency_iso'].upper()
try:
if EUR_CY_NBR:
cur_numeric = str(EUR_CY_NBR)
else:
cur = pycountry.currencies.get(alpha_3=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
logger.error("Currency %s is not recognized" % cur_iso_letter)
return False
data = {
'pos_number': str(1).zfill(2),
'answer_flag': '0',
'transaction_type': '0',
'payment_mode': payment_mode,
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10,
'delay': 'A010',
'auto': 'B010',
'amount_msg': self._get_amount(payment_info_dict),
}
return data
def generate_lrc(self, real_msg_with_etx):
lrc = 0
for char in real_msg_with_etx:
lrc ^= ord(char)
return lrc
def send_message(self, data):
'''We use protocol E+'''
ascii_names = curses.ascii.controlnames
real_msg = (
data['pos_number'] +
data['amount_msg'] +
data['answer_flag'] +
data['payment_mode'] +
data['transaction_type'] +
data['currency_numeric'] +
data['private'] +
data['delay'] +
data['auto'])
logger.debug('Real message to send = %s' % real_msg)
assert len(real_msg) == 34, 'Wrong length for protocol E+'
real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
lrc = self.generate_lrc(real_msg_with_etx)
message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
self.serial_write(message)
logger.info('Message sent to terminal')
def compare_data_vs_answer(self, data, answer_data):
for field in ['pos_number', 'amount_msg', 'currency_numeric', 'private']:
if data[field] != answer_data[field]:
logger.warning(
"Field %s has value '%s' in data and value '%s' in answer"
% (field, data[field], answer_data[field]))
def parse_terminal_answer(self, real_msg, data):
answer_data = {
'pos_number': real_msg[0:2],
'transaction_result': real_msg[2],
'amount_msg': real_msg[3:11],
'payment_mode': real_msg[11],
'currency_numeric': real_msg[12:15],
'private': real_msg[15:26],
}
logger.debug('answer_data = %s' % answer_data)
self.compare_data_vs_answer(data, answer_data)
return answer_data
def get_answer_from_terminal(self, data):
ascii_names = curses.ascii.controlnames
full_msg_size = 1 + 2 + 1 + 8 + 1 + 3 + 10 + 1 + 1
msg = self.serial_read(size=full_msg_size)
logger.debug('%d bytes read from terminal' % full_msg_size)
assert len(msg) == full_msg_size, 'Answer has a wrong size'
if msg[0] != chr(ascii_names.index('STX')):
logger.error(
'The first byte of the answer from terminal should be STX')
if msg[-2] != chr(ascii_names.index('ETX')):
logger.error(
'The byte before final of the answer from terminal '
'should be ETX')
lrc = msg[-1]
computed_lrc = chr(self.generate_lrc(msg[1:-1]))
if computed_lrc != lrc:
logger.error(
'The LRC of the answer from terminal is wrong')
real_msg = msg[1:-2]
logger.debug('Real answer received = %s' % real_msg)
return self.parse_terminal_answer(real_msg, data)
def transaction_start(self, payment_info):
'''This function sends the data to the serial/usb port.
'''
payment_info_dict = simplejson.loads(payment_info)
assert isinstance(payment_info_dict, dict), \
'payment_info_dict should be a dict'
try:
logger.debug(
'Opening serial port %s for payment terminal with baudrate %d'
% (self.device_name, self.device_rate))
# IMPORTANT : don't modify timeout=3 seconds
# This parameter is very important ; the Telium spec say
# that we have to wait to up 3 seconds to get LRC
self.serial = Serial(
self.device_name, self.device_rate,
timeout=3)
logger.debug('serial.is_open = %s' % self.serial.isOpen())
if self.serial.isOpen():
self.set_status("connected",
"Connected to {} with baudrate {}".format(
self.device_name, self.device_rate))
else:
self.set_status("disconnected",
"Could not connect to {}"
.format(self.device_name))
if self.initialize_msg():
data = self.prepare_data_to_send(payment_info_dict)
if not data:
return
self.send_message(data)
if self.get_one_byte_answer('ACK'):
self.send_one_byte_signal('EOT')
self.status['in_transaction'] = True
logger.debug("Now expecting answer from Terminal")
# We wait the end of transaction
attempt_nr = 0
while attempt_nr < 600:
attempt_nr += 1
if self.get_one_byte_answer('ENQ'):
self.send_one_byte_signal('ACK')
answer = self.get_answer_from_terminal(data)
# '0' : accepted transaction
# '7' : refused transaction
if answer['transaction_result'] == '0' \
and self._get_amount(payment_info_dict) == answer['amount_msg']:
self.status['latest_transactions'] = {payment_info_dict['order_id']: {}}
logger.info("Transaction OK")
self.send_one_byte_signal('ACK')
if self.get_one_byte_answer('EOT'):
logger.debug("Answer received from Terminal")
break
time.sleep(0.5)
self.status['in_transaction'] = False
except Exception as e:
logger.error('Exception in serial connection: %s' % str(e))
self.set_status("error",
"Exception in serial connection to {}"
.format(self.device_name))
finally:
if self.serial:
logger.debug('Closing serial port for payment terminal')
self.serial.close()
def run(self):
while True:
try:
timestamp, task, data = self.queue.get(True)
if task == 'transaction_start':
self.transaction_start(data)
elif task == 'status':
pass
except Exception as e:
self.set_status('error', str(e))
driver = TeliumPaymentTerminalDriver()
hw_proxy.drivers['telium_payment_terminal'] = driver
class TeliumPaymentTerminalProxy(hw_proxy.Proxy):
@http.route(
'/hw_proxy/payment_terminal_transaction_start',
type='json', auth='none', cors='*')
def payment_terminal_transaction_start(self, payment_info):
logger.debug(
'Telium: Call payment_terminal_transaction_start with '
'payment_info=%s', payment_info)
driver.push_task('transaction_start', payment_info)

BIN
hw_telium_payment_terminal/static/description/icon.png

After

Width: 128  |  Height: 128  |  Size: 9.2 KiB

198
hw_telium_payment_terminal/test-scripts/telium-test.py

@ -0,0 +1,198 @@
#! /usr/bin/python
# -*- encoding: utf-8 -*-
from serial import Serial
import curses.ascii
import time
import pycountry
DEVICE = '/dev/ttyACM0'
DEVICE_RATE = 9600
PAYMENT_MODE = 'card' # 'card' or 'check'
CURRENCY_ISO = 'EUR'
AMOUNT = 12.42
def serial_write(serial, text):
assert isinstance(text, str), 'text must be a string'
serial.write(text)
def initialize_msg(serial):
max_attempt = 3
attempt_nr = 0
while attempt_nr < max_attempt:
attempt_nr += 1
send_one_byte_signal(serial, 'ENQ')
if get_one_byte_answer(serial, 'ACK'):
return True
else:
print("Terminal : SAME PLAYER TRY AGAIN")
send_one_byte_signal(serial, 'EOT')
# Wait 1 sec between each attempt
time.sleep(1)
return False
def send_one_byte_signal(serial, signal):
ascii_names = curses.ascii.controlnames
assert signal in ascii_names, 'Wrong signal'
char = ascii_names.index(signal)
serial_write(serial, chr(char))
print('Signal %s sent to terminal' % signal)
def get_one_byte_answer(serial, expected_signal):
ascii_names = curses.ascii.controlnames
one_byte_read = serial.read(1)
expected_char = ascii_names.index(expected_signal)
if one_byte_read == chr(expected_char):
print("%s received from terminal" % expected_signal)
return True
else:
return False
def prepare_data_to_send():
if PAYMENT_MODE == 'check':
payment_mode = 'C'
elif PAYMENT_MODE == 'card':
payment_mode = '1'
else:
print("The payment mode '%s' is not supported" % PAYMENT_MODE)
return False
cur_iso_letter = CURRENCY_ISO.upper()
try:
cur = pycountry.currencies.get(alpha_3=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
print("Currency %s is not recognized" % cur_iso_letter)
return False
data = {
'pos_number': str(1).zfill(2),
'answer_flag': '0',
'transaction_type': '0',
'payment_mode': payment_mode,
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10,
'delay': 'A011',
'auto': 'B010',
'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
}
return data
def generate_lrc(real_msg_with_etx):
lrc = 0
for char in real_msg_with_etx:
lrc ^= ord(char)
return lrc
def send_message(serial, data):
'''We use protocol E+'''
ascii_names = curses.ascii.controlnames
real_msg = (
data['pos_number'] +
data['amount_msg'] +
data['answer_flag'] +
data['payment_mode'] +
data['transaction_type'] +
data['currency_numeric'] +
data['private'] +
data['delay'] +
data['auto'])
print('Real message to send = %s' % real_msg)
assert len(real_msg) == 34, 'Wrong length for protocol E+'
real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
lrc = generate_lrc(real_msg_with_etx)
message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
serial_write(serial, message)
print('Message sent to terminal')
def compare_data_vs_answer(data, answer_data):
for field in [
'pos_number', 'amount_msg',
'currency_numeric', 'private']:
if data[field] != answer_data[field]:
print(
"Field %s has value '%s' in data and value '%s' in answer"
% (field, data[field], answer_data[field]))
def parse_terminal_answer(real_msg, data):
answer_data = {
'pos_number': real_msg[0:2],
'transaction_result': real_msg[2],
'amount_msg': real_msg[3:11],
'payment_mode': real_msg[11],
'currency_numeric': real_msg[12:15],
'private': real_msg[15:26],
}
print('answer_data = %s' % answer_data)
compare_data_vs_answer(data, answer_data)
return answer_data
def get_answer_from_terminal(serial, data):
ascii_names = curses.ascii.controlnames
full_msg_size = 1+2+1+8+1+3+10+1+1
msg = serial.read(size=full_msg_size)
print('%d bytes read from terminal' % full_msg_size)
assert len(msg) == full_msg_size, 'Answer has a wrong size'
if msg[0] != chr(ascii_names.index('STX')):
print('The first byte of the answer from terminal should be STX')
if msg[-2] != chr(ascii_names.index('ETX')):
print('The byte before final of the answer '
'from terminal should be ETX')
lrc = msg[-1]
computed_lrc = chr(generate_lrc(msg[1:-1]))
if computed_lrc != lrc:
print('The LRC of the answer from terminal is wrong')
real_msg = msg[1:-2]
print('Real answer received = %s' % real_msg)
return parse_terminal_answer(real_msg, data)
def transaction_start():
'''This function sends the data to the serial/usb port.
'''
serial = False
try:
print(
'Opening serial port %s for payment terminal with '
'baudrate %d' % (DEVICE, DEVICE_RATE))
# IMPORTANT : don't modify timeout=3 seconds
# This parameter is very important ; the Telium spec say
# that we have to wait to up 3 seconds to get LRC
serial = Serial(
DEVICE, DEVICE_RATE, timeout=3)
print('serial.is_open = %s' % serial.isOpen())
if initialize_msg(serial):
data = prepare_data_to_send()
if not data:
return
send_message(serial, data)
if get_one_byte_answer(serial, 'ACK'):
send_one_byte_signal(serial, 'EOT')
print("Now expecting answer from Terminal")
if get_one_byte_answer(serial, 'ENQ'):
send_one_byte_signal(serial, 'ACK')
get_answer_from_terminal(serial, data)
send_one_byte_signal(serial, 'ACK')
if get_one_byte_answer(serial, 'EOT'):
print("Answer received from Terminal")
except Exception as e:
print('Exception in serial connection: %s' % str(e))
finally:
if serial:
print('Closing serial port for payment terminal')
serial.close()
if __name__ == '__main__':
transaction_start()
Loading…
Cancel
Save