Browse Source

[9.0] Remove hw_customer_display and hw_telium_payment_terminal (#230)

* Remove hw_customer_display and hw_telium_payment_terminal

POSbox image runs Odoo v8, so we should maintain hw_* modules only in the 8.0 branch for the moment.

* Update requirements.txt following removal of hw_* modules
pull/83/merge
Alexis de Lattre 7 years ago
committed by Sylvain LE GAL
parent
commit
6de9013eb9
  1. 24
      hw_customer_display/__init__.py
  2. 86
      hw_customer_display/__openerp__.py
  3. 24
      hw_customer_display/controllers/__init__.py
  4. 180
      hw_customer_display/controllers/main.py
  5. 68
      hw_customer_display/test-scripts/customer-display-test.py
  6. 24
      hw_telium_payment_terminal/__init__.py
  7. 84
      hw_telium_payment_terminal/__openerp__.py
  8. 24
      hw_telium_payment_terminal/controllers/__init__.py
  9. 284
      hw_telium_payment_terminal/controllers/main.py
  10. 218
      hw_telium_payment_terminal/test-scripts/telium-test.py
  11. 3
      requirements.txt

24
hw_customer_display/__init__.py

@ -1,24 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Customer Display module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from . import controllers

86
hw_customer_display/__openerp__.py

@ -1,86 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Customer Display module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
{
'name': 'Hardware Customer Display',
'version': '8.0.0.1.0',
'category': 'Hardware Drivers',
'license': 'AGPL-3',
'summary': 'Adds support for Customer Display in the Point of Sale',
'description': """
Hardware Customer Display
=========================
This module adds support for Customer Display in the Point of Sale.
This module is designed to be installed on the *POSbox* (i.e. the
proxy on which the USB devices are connected) and not on the main
Odoo server. On the main Odoo server, you should install the module
*pos_customer_display*.
The configuration of the hardware is done in the configuration file of
the Odoo server of the POSbox. You should add the following entries in
the configuration file:
* customer_display_device_name (default = /dev/ttyUSB0)
* customer_display_device_rate (default = 9600)
* customer_display_device_timeout (default = 2 seconds)
The number of cols of the Customer Display (usually 20) should be
configured on the main Odoo server, in the menu Point of Sale >
Configuration > Point of Sales. The number of rows is supposed to be 2.
It should support most serial and USB-serial LCD displays out-of-the-box
or with inheritance of a few functions.
It has been tested with:
* Bixolon BCD-1100 (Datasheet :
http://www.bixolon.com/html/en/product/product_detail.xhtml?prod_id=61)
* Bixolon BCD-1000
To setup the BCD-1100 on Linux, you will find some technical instructions
on this page:
http://techtuxwords.blogspot.fr/2012/12/linux-and-bixolon-bcd-1100.html
If you have a kernel >= 3.12, you should also read this:
http://www.leniwiec.org/en/2014/06/25/ubuntu-14-04lts-how-to-pass-id-ven
dor-and-id-product-to-ftdi_sio-driver/
This module has been developped during a POS code sprint at Akretion
France from July 7th to July 10th 2014. This module is part of the POS
project of the Odoo Community Association http://odoo-community.org/.
You are invited to become a member and/or get involved in the
Association !
This module has been written by Alexis de Lattre from Akretion
<alexis.delattre@akretion.com>.
""",
'author': "Akretion,Odoo Community Association (OCA)",
'website': 'http://www.akretion.com',
'depends': ['hw_proxy'],
'external_dependencies': {
'python': ['serial', 'unidecode'],
},
'data': [],
'installable': False,
}

24
hw_customer_display/controllers/__init__.py

@ -1,24 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Customer Display module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from . import main

180
hw_customer_display/controllers/main.py

@ -1,180 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Customer Display module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import logging
import simplejson
import time
from threading import Thread, Lock
from Queue import Queue
from unidecode import unidecode
from serial import Serial
import openerp.addons.hw_proxy.controllers.main as hw_proxy
from openerp import http
from openerp.tools.config import config
logger = logging.getLogger(__name__)
class CustomerDisplayDriver(Thread):
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.lock = Lock()
self.status = {'status': 'connecting', 'messages': []}
self.device_name = config.get(
'customer_display_device_name', '/dev/ttyUSB0')
self.device_rate = int(config.get(
'customer_display_device_rate', 9600))
self.device_timeout = int(config.get(
'customer_display_device_timeout', 2))
self.serial = False
def get_status(self):
self.push_task('status')
return self.status
def set_status(self, status, message=None):
if status == self.status['status']:
if message is not None and message != self.status['messages'][-1]:
self.status['messages'].append(message)
else:
self.status['status'] = status
if message:
self.status['messages'] = [message]
else:
self.status['messages'] = []
if status == 'error' and message:
logger.error('Display Error: '+message)
elif status == 'disconnected' and message:
logger.warning('Disconnected Display: '+message)
def lockedstart(self):
with self.lock:
if not self.isAlive():
self.daemon = True
self.start()
def push_task(self, task, data=None):
self.lockedstart()
self.queue.put((time.time(), task, data))
def move_cursor(self, col, row):
# Bixolon spec : 11. "Move Cursor to Specified Position"
self.cmd_serial_write('\x1B\x6C' + chr(col) + chr(row))
def display_text(self, lines):
logger.debug(
"Preparing to send the following lines to LCD: %s" % lines)
# We don't check the number of rows/cols here, because it has already
# been checked in the POS client in the JS code
lines_ascii = []
for line in lines:
lines_ascii.append(unidecode(line))
row = 0
for dline in lines_ascii:
row += 1
self.move_cursor(1, row)
self.serial_write(dline)
def setup_customer_display(self):
'''Set LCD cursor to off
If your LCD has different setup instruction(s), you should
inherit this function'''
# Bixolon spec : 35. "Set Cursor On/Off"
self.cmd_serial_write('\x1F\x43\x00')
logger.debug('LCD cursor set to off')
def clear_customer_display(self):
'''If your LCD has different clearing instruction, you should inherit
this function'''
# Bixolon spec : 12. "Clear Display Screen and Clear String Mode"
self.cmd_serial_write('\x0C')
logger.debug('Customer display cleared')
def cmd_serial_write(self, command):
'''If your LCD requires a prefix and/or suffix on all commands,
you should inherit this function'''
assert isinstance(command, str), 'command must be a string'
self.serial_write(command)
def serial_write(self, text):
assert isinstance(text, str), 'text must be a string'
self.serial.write(text)
def send_text_customer_display(self, text_to_display):
'''This function sends the data to the serial/usb port.
We open and close the serial connection on every message display.
Why ?
1. Because it is not a problem for the customer display
2. Because it is not a problem for performance, according to my tests
3. Because it allows recovery on errors : you can unplug/replug the
customer display and it will work again on the next message without
problem
'''
lines = simplejson.loads(text_to_display)
assert isinstance(lines, list), 'lines_list should be a list'
try:
logger.debug(
'Opening serial port %s for customer display with baudrate %d'
% (self.device_name, self.device_rate))
self.serial = Serial(
self.device_name, self.device_rate,
timeout=self.device_timeout)
logger.debug('serial.is_open = %s' % self.serial.isOpen())
self.setup_customer_display()
self.clear_customer_display()
self.display_text(lines)
except Exception, e:
logger.error('Exception in serial connection: %s' % str(e))
finally:
if self.serial:
logger.debug('Closing serial port for customer display')
self.serial.close()
def run(self):
while True:
try:
timestamp, task, data = self.queue.get(True)
if task == 'display':
self.send_text_customer_display(data)
elif task == 'status':
pass
except Exception as e:
self.set_status('error', str(e))
driver = CustomerDisplayDriver()
hw_proxy.drivers['customer_display'] = driver
class CustomerDisplayProxy(hw_proxy.Proxy):
@http.route(
'/hw_proxy/send_text_customer_display', type='json', auth='none',
cors='*')
def send_text_customer_display(self, text_to_display):
logger.debug(
'LCD: Call send_text_customer_display with text=%s',
text_to_display)
driver.push_task('display', text_to_display)

68
hw_customer_display/test-scripts/customer-display-test.py

@ -1,68 +0,0 @@
#! /usr/bin/python
# -*- encoding: utf-8 -*-
# Author : Alexis de Lattre <alexis.delattre@akretion.com>
# The licence is in the file __openerp__.py
# This is a test script, that you can use if you want to test/play
# with the customer display independantly from the Odoo server
# It has been tested with a Bixolon BCD-1100
from serial import Serial
from unidecode import unidecode
import sys
DEVICE = '/dev/ttyUSB0'
DEVICE_RATE = 9600
DEVICE_COLS = 20
def display_text(ser, line1, line2):
print "convert to ascii"
line1 = unidecode(line1)
line2 = unidecode(line2)
print "set lines to the right lenght (%s)" % DEVICE_COLS
for line in [line1, line2]:
if len(line) < DEVICE_COLS:
line += ' ' * (DEVICE_COLS - len(line))
elif len(line) > DEVICE_COLS:
line = line[0:DEVICE_COLS]
assert len(line) == DEVICE_COLS, 'Wrong length'
print "try to clear display"
ser.write('\x0C')
print "clear done"
print "try to position at start of 1st line"
ser.write('\x1B\x6C' + chr(1) + chr(1))
print "position done"
print "try to write 1st line"
ser.write(line1)
print "write 1st line done"
print "try to position at start of 2nd line"
ser.write('\x1B\x6C' + chr(1) + chr(2))
print "position done"
print "try to write 2nd line"
ser.write(line2)
print "write done"
def open_close_display(line1, line2):
ser = False
try:
print "open serial port"
ser = Serial(DEVICE, DEVICE_RATE, timeout=2)
print "serial port open =", ser.isOpen()
print "try to set cursor to off"
ser.write('\x1F\x43\x00')
print "cursor set to off"
display_text(ser, line1, line2)
except Exception, e:
print "EXCEPTION e=", e
sys.exit(1)
finally:
if ser:
print "close serial port"
ser.close()
if __name__ == '__main__':
line1 = u'POS Code Sprint'
line2 = u'@ Akretion 2014/07'
open_close_display(line1, line2)

24
hw_telium_payment_terminal/__init__.py

@ -1,24 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Payment Terminal module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from . import controllers

84
hw_telium_payment_terminal/__openerp__.py

@ -1,84 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Payment Terminal module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
{
'name': 'Hardware Telium Payment Terminal',
'version': '8.0.0.1.0',
'category': 'Hardware Drivers',
'license': 'AGPL-3',
'summary': 'Adds support for Payment Terminals using Telium protocol',
'description': """
Hardware Telium Payment Terminal
================================
This module adds support for credit card reader and checks printers
using Telium protocol in the Point of Sale. This module is designed to
be installed on the *POSbox* (i.e. the proxy on which the USB devices
are connected) and not on the main Odoo server. On the main Odoo server,
you should install the module *pos_payment_terminal*.
The configuration of the hardware is done in the configuration file of
the Odoo server of the POSbox. You should add the following entries in
the configuration file:
* payment_terminal_device_name (default = /dev/ttyACM0)
* payment_terminal_device_rate (default = 9600)
The Telium protocol is used by Ingenico and Sagem payment terminals. It
is based on the Concert protocol, so it can probably work with payment
terminals from other brands. This module implements the protocol E+ (and
not the protocol E), so it requires a Telium Manager version 37783600
or superior. To get the version of the Telium Manager on an Ingenico
terminal press F > 0-TELIUM MANAGER > 2-Consultation > 4-Configuration
> 2-Software > 1-TERMINAL > On Display > Telium Manager and then read
the field *M20S*.
You will need to configure your payment terminal to accept commands
from the POS. On an Ingenico terminal press F > 0-TELIUM MANAGER >
5-Initialization > 1-Parameters > Cash Connection and then select *On*
and then *USB*. After that, you should reboot the terminal.
This module has been successfully tested with:
* Ingenico EFTSmart4S
* Ingenico EFTSmart2 2640 with Telim Manager version 37784503
* Ingenico i2200 cheque reader and writer
This module has been developped during a POS code sprint at Akretion
France from July 7th to July 10th 2014. This module is part of the POS
project of the Odoo Community Association http://odoo-community.org/.
You are invited to become a member and/or get involved in the
Association !
This module has been written by Alexis de Lattre
<alexis.delattre@akretion.com> from Akretion.
""",
'author': "Akretion,Odoo Community Association (OCA)",
'website': 'http://www.akretion.com',
'depends': ['hw_proxy'],
'external_dependencies': {
'python': ['serial', 'pycountry'],
},
'data': [],
'installable': False,
}

24
hw_telium_payment_terminal/controllers/__init__.py

@ -1,24 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Payment Terminal module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from . import main

284
hw_telium_payment_terminal/controllers/main.py

@ -1,284 +0,0 @@
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Payment Terminal module for Odoo
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import logging
import simplejson
import time
import curses.ascii
from threading import Thread, Lock
from Queue import Queue
from serial import Serial
import pycountry
import openerp.addons.hw_proxy.controllers.main as hw_proxy
from openerp import http
from openerp.tools.config import config
logger = logging.getLogger(__name__)
class TeliumPaymentTerminalDriver(Thread):
def __init__(self):
Thread.__init__(self)
self.queue = Queue()
self.lock = Lock()
self.status = {'status': 'connecting', 'messages': []}
self.device_name = config.get(
'telium_terminal_device_name', '/dev/ttyACM0')
self.device_rate = int(config.get(
'telium_terminal_device_rate', 9600))
self.serial = False
def get_status(self):
self.push_task('status')
return self.status
def set_status(self, status, message=None):
if status == self.status['status']:
if message is not None and message != self.status['messages'][-1]:
self.status['messages'].append(message)
else:
self.status['status'] = status
if message:
self.status['messages'] = [message]
else:
self.status['messages'] = []
if status == 'error' and message:
logger.error('Payment Terminal Error: '+message)
elif status == 'disconnected' and message:
logger.warning('Disconnected Terminal: '+message)
def lockedstart(self):
with self.lock:
if not self.isAlive():
self.daemon = True
self.start()
def push_task(self, task, data=None):
self.lockedstart()
self.queue.put((time.time(), task, data))
def serial_write(self, text):
assert isinstance(text, str), 'text must be a string'
self.serial.write(text)
def initialize_msg(self):
max_attempt = 3
attempt_nr = 0
while attempt_nr < max_attempt:
attempt_nr += 1
self.send_one_byte_signal('ENQ')
if self.get_one_byte_answer('ACK'):
return True
else:
logger.warning("Terminal : SAME PLAYER TRY AGAIN")
self.send_one_byte_signal('EOT')
# Wait 1 sec between each attempt
time.sleep(1)
return False
def send_one_byte_signal(self, signal):
ascii_names = curses.ascii.controlnames
assert signal in ascii_names, 'Wrong signal'
char = ascii_names.index(signal)
self.serial_write(chr(char))
logger.debug('Signal %s sent to terminal' % signal)
def get_one_byte_answer(self, expected_signal):
ascii_names = curses.ascii.controlnames
one_byte_read = self.serial.read(1)
expected_char = ascii_names.index(expected_signal)
if one_byte_read == chr(expected_char):
logger.debug("%s received from terminal" % expected_signal)
return True
else:
return False
def prepare_data_to_send(self, payment_info_dict):
amount = payment_info_dict['amount']
if payment_info_dict['payment_mode'] == 'check':
payment_mode = 'C'
elif payment_info_dict['payment_mode'] == 'card':
payment_mode = '1'
else:
logger.error(
"The payment mode '%s' is not supported"
% payment_info_dict['payment_mode'])
return False
cur_iso_letter = payment_info_dict['currency_iso'].upper()
try:
cur = pycountry.currencies.get(letter=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
logger.error("Currency %s is not recognized" % cur_iso_letter)
return False
data = {
'pos_number': str(1).zfill(2),
'answer_flag': '0',
'transaction_type': '0',
'payment_mode': payment_mode,
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10,
'delay': 'A011',
'auto': 'B010',
'amount_msg': ('%.0f' % (amount * 100)).zfill(8),
}
return data
def generate_lrc(self, real_msg_with_etx):
lrc = 0
for char in real_msg_with_etx:
lrc ^= ord(char)
return lrc
def send_message(self, data):
'''We use protocol E+'''
ascii_names = curses.ascii.controlnames
real_msg = (
data['pos_number'] +
data['amount_msg'] +
data['answer_flag'] +
data['payment_mode'] +
data['transaction_type'] +
data['currency_numeric'] +
data['private'] +
data['delay'] +
data['auto'])
logger.debug('Real message to send = %s' % real_msg)
assert len(real_msg) == 34, 'Wrong length for protocol E+'
real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
lrc = self.generate_lrc(real_msg_with_etx)
message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
self.serial_write(message)
logger.info('Message sent to terminal')
def compare_data_vs_answer(self, data, answer_data):
for field in [
'pos_number', 'amount_msg',
'currency_numeric', 'private']:
if data[field] != answer_data[field]:
logger.warning(
"Field %s has value '%s' in data and value '%s' in answer"
% (field, data[field], answer_data[field]))
def parse_terminal_answer(self, real_msg, data):
answer_data = {
'pos_number': real_msg[0:2],
'transaction_result': real_msg[2],
'amount_msg': real_msg[3:11],
'payment_mode': real_msg[11],
'currency_numeric': real_msg[12:15],
'private': real_msg[15:26],
}
logger.debug('answer_data = %s' % answer_data)
self.compare_data_vs_answer(data, answer_data)
return answer_data
def get_answer_from_terminal(self, data):
ascii_names = curses.ascii.controlnames
full_msg_size = 1+2+1+8+1+3+10+1+1
msg = self.serial.read(size=full_msg_size)
logger.debug('%d bytes read from terminal' % full_msg_size)
assert len(msg) == full_msg_size, 'Answer has a wrong size'
if msg[0] != chr(ascii_names.index('STX')):
logger.error(
'The first byte of the answer from terminal should be STX')
if msg[-2] != chr(ascii_names.index('ETX')):
logger.error(
'The byte before final of the answer from terminal '
'should be ETX')
lrc = msg[-1]
computed_lrc = chr(self.generate_lrc(msg[1:-1]))
if computed_lrc != lrc:
logger.error(
'The LRC of the answer from terminal is wrong')
real_msg = msg[1:-2]
logger.debug('Real answer received = %s' % real_msg)
return self.parse_terminal_answer(real_msg, data)
def transaction_start(self, payment_info):
'''This function sends the data to the serial/usb port.
'''
payment_info_dict = simplejson.loads(payment_info)
assert isinstance(payment_info_dict, dict), \
'payment_info_dict should be a dict'
try:
logger.debug(
'Opening serial port %s for payment terminal with baudrate %d'
% (self.device_name, self.device_rate))
# IMPORTANT : don't modify timeout=3 seconds
# This parameter is very important ; the Telium spec say
# that we have to wait to up 3 seconds to get LRC
self.serial = Serial(
self.device_name, self.device_rate,
timeout=3)
logger.debug('serial.is_open = %s' % self.serial.isOpen())
if self.initialize_msg():
data = self.prepare_data_to_send(payment_info_dict)
if not data:
return
self.send_message(data)
if self.get_one_byte_answer('ACK'):
self.send_one_byte_signal('EOT')
logger.info("Now expecting answer from Terminal")
if self.get_one_byte_answer('ENQ'):
self.send_one_byte_signal('ACK')
self.get_answer_from_terminal(data)
self.send_one_byte_signal('ACK')
if self.get_one_byte_answer('EOT'):
logger.info("Answer received from Terminal")
except Exception, e:
logger.error('Exception in serial connection: %s' % str(e))
finally:
if self.serial:
logger.debug('Closing serial port for payment terminal')
self.serial.close()
def run(self):
while True:
try:
timestamp, task, data = self.queue.get(True)
if task == 'transaction_start':
self.transaction_start(data)
elif task == 'status':
pass
except Exception as e:
self.set_status('error', str(e))
driver = TeliumPaymentTerminalDriver()
hw_proxy.drivers['telium_payment_terminal'] = driver
class TeliumPaymentTerminalProxy(hw_proxy.Proxy):
@http.route(
'/hw_proxy/payment_terminal_transaction_start',
type='json', auth='none', cors='*')
def payment_terminal_transaction_start(self, payment_info):
logger.debug(
'Telium: Call payment_terminal_transaction_start with '
'payment_info=%s', payment_info)
driver.push_task('transaction_start', payment_info)

218
hw_telium_payment_terminal/test-scripts/telium-test.py

@ -1,218 +0,0 @@
#! /usr/bin/python
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Test script
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from serial import Serial
import curses.ascii
import time
import pycountry
DEVICE = '/dev/ttyACM0'
DEVICE_RATE = 9600
PAYMENT_MODE = 'card' # 'card' ou 'check'
CURRENCY_ISO = 'EUR'
AMOUNT = 12.42
def serial_write(serial, text):
assert isinstance(text, str), 'text must be a string'
serial.write(text)
def initialize_msg(serial):
max_attempt = 3
attempt_nr = 0
while attempt_nr < max_attempt:
attempt_nr += 1
send_one_byte_signal(serial, 'ENQ')
if get_one_byte_answer(serial, 'ACK'):
return True
else:
print "Terminal : SAME PLAYER TRY AGAIN"
send_one_byte_signal(serial, 'EOT')
# Wait 1 sec between each attempt
time.sleep(1)
return False
def send_one_byte_signal(serial, signal):
ascii_names = curses.ascii.controlnames
assert signal in ascii_names, 'Wrong signal'
char = ascii_names.index(signal)
serial_write(serial, chr(char))
print 'Signal %s sent to terminal' % signal
def get_one_byte_answer(serial, expected_signal):
ascii_names = curses.ascii.controlnames
one_byte_read = serial.read(1)
expected_char = ascii_names.index(expected_signal)
if one_byte_read == chr(expected_char):
print "%s received from terminal" % expected_signal
return True
else:
return False
def prepare_data_to_send():
if PAYMENT_MODE == 'check':
payment_mode = 'C'
elif PAYMENT_MODE == 'card':
payment_mode = '1'
else:
print "The payment mode '%s' is not supported" % PAYMENT_MODE
return False
cur_iso_letter = CURRENCY_ISO.upper()
try:
cur = pycountry.currencies.get(letter=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
print "Currency %s is not recognized" % cur_iso_letter
return False
data = {
'pos_number': str(1).zfill(2),
'answer_flag': '0',
'transaction_type': '0',
'payment_mode': payment_mode,
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10,
'delay': 'A011',
'auto': 'B010',
'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
}
return data
def generate_lrc(real_msg_with_etx):
lrc = 0
for char in real_msg_with_etx:
lrc ^= ord(char)
return lrc
def send_message(serial, data):
'''We use protocol E+'''
ascii_names = curses.ascii.controlnames
real_msg = (
data['pos_number'] +
data['amount_msg'] +
data['answer_flag'] +
data['payment_mode'] +
data['transaction_type'] +
data['currency_numeric'] +
data['private'] +
data['delay'] +
data['auto'])
print 'Real message to send = %s' % real_msg
assert len(real_msg) == 34, 'Wrong length for protocol E+'
real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
lrc = generate_lrc(real_msg_with_etx)
message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
serial_write(serial, message)
print 'Message sent to terminal'
def compare_data_vs_answer(data, answer_data):
for field in [
'pos_number', 'amount_msg',
'currency_numeric', 'private']:
if data[field] != answer_data[field]:
print (
"Field %s has value '%s' in data and value '%s' in answer"
% (field, data[field], answer_data[field]))
def parse_terminal_answer(real_msg, data):
answer_data = {
'pos_number': real_msg[0:2],
'transaction_result': real_msg[2],
'amount_msg': real_msg[3:11],
'payment_mode': real_msg[11],
'currency_numeric': real_msg[12:15],
'private': real_msg[15:26],
}
print 'answer_data = %s' % answer_data
compare_data_vs_answer(data, answer_data)
return answer_data
def get_answer_from_terminal(serial, data):
ascii_names = curses.ascii.controlnames
full_msg_size = 1+2+1+8+1+3+10+1+1
msg = serial.read(size=full_msg_size)
print '%d bytes read from terminal' % full_msg_size
assert len(msg) == full_msg_size, 'Answer has a wrong size'
if msg[0] != chr(ascii_names.index('STX')):
print 'The first byte of the answer from terminal should be STX'
if msg[-2] != chr(ascii_names.index('ETX')):
print 'The byte before final of the answer from terminal should be ETX'
lrc = msg[-1]
computed_lrc = chr(generate_lrc(msg[1:-1]))
if computed_lrc != lrc:
print 'The LRC of the answer from terminal is wrong'
real_msg = msg[1:-2]
print 'Real answer received = %s' % real_msg
return parse_terminal_answer(real_msg, data)
def transaction_start():
'''This function sends the data to the serial/usb port.
'''
serial = False
try:
print(
'Opening serial port %s for payment terminal with '
'baudrate %d' % (DEVICE, DEVICE_RATE))
# IMPORTANT : don't modify timeout=3 seconds
# This parameter is very important ; the Telium spec say
# that we have to wait to up 3 seconds to get LRC
serial = Serial(
DEVICE, DEVICE_RATE, timeout=3)
print 'serial.is_open = %s' % serial.isOpen()
if initialize_msg(serial):
data = prepare_data_to_send()
if not data:
return
send_message(serial, data)
if get_one_byte_answer(serial, 'ACK'):
send_one_byte_signal(serial, 'EOT')
print "Now expecting answer from Terminal"
if get_one_byte_answer(serial, 'ENQ'):
send_one_byte_signal(serial, 'ACK')
get_answer_from_terminal(serial, data)
send_one_byte_signal(serial, 'ACK')
if get_one_byte_answer(serial, 'EOT'):
print "Answer received from Terminal"
except Exception, e:
print 'Exception in serial connection: %s' % str(e)
finally:
if serial:
print 'Closing serial port for payment terminal'
serial.close()
if __name__ == '__main__':
transaction_start()

3
requirements.txt

@ -1,3 +0,0 @@
unidecode
pyserial
pycountry
Loading…
Cancel
Save