Browse Source

Use pycountry

Add test script for credit card reader
pull/471/head
Alexis de Lattre 10 years ago
committed by Florent de Labarre
parent
commit
a6258568ea
  1. 3
      hw_telium_payment_terminal/__openerp__.py
  2. 13
      hw_telium_payment_terminal/controllers/main.py
  3. 219
      hw_telium_payment_terminal/test-scripts/telium-test.py

3
hw_telium_payment_terminal/__openerp__.py

@ -56,8 +56,7 @@ Please contact Alexis de Lattre from Akretion <alexis.delattre@akretion.com> for
'website': 'http://www.akretion.com', 'website': 'http://www.akretion.com',
'depends': ['hw_proxy'], 'depends': ['hw_proxy'],
'external_dependencies': { 'external_dependencies': {
'python': ['serial'],
'python': ['serial', 'pycountry'],
}, },
'data': [], 'data': [],
'active': False,
} }

13
hw_telium_payment_terminal/controllers/main.py

@ -28,6 +28,7 @@ import curses.ascii
from threading import Thread, Lock from threading import Thread, Lock
from Queue import Queue from Queue import Queue
from serial import Serial from serial import Serial
import pycountry
import openerp.addons.hw_proxy.controllers.main as hw_proxy import openerp.addons.hw_proxy.controllers.main as hw_proxy
from openerp import http from openerp import http
from openerp.tools.config import config from openerp.tools.config import config
@ -125,13 +126,19 @@ class TeliumPaymentTerminalDriver(Thread):
"The payment mode '%s' is not supported" "The payment mode '%s' is not supported"
% payment_info_dict['payment_mode']) % payment_info_dict['payment_mode'])
return False return False
cur_iso_letter = payment_info_dict['currency_iso'].upper()
try:
cur = pycountry.currencies.get(letter=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
logger.error("Currency %s is not recognized" % cur_iso_letter)
return False
data = { data = {
'pos_number': str(1).zfill(2), 'pos_number': str(1).zfill(2),
'answer_flag': '0', 'answer_flag': '0',
'transaction_type': '0', 'transaction_type': '0',
'payment_mode': payment_mode, 'payment_mode': payment_mode,
'currency_numeric':
payment_info_dict['currency_iso_numeric'].zfill(3),
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10, 'private': ' ' * 10,
'delay': 'A011', 'delay': 'A011',
'auto': 'B010', 'auto': 'B010',
@ -240,7 +247,7 @@ class TeliumPaymentTerminalDriver(Thread):
logger.info("Now expecting answer from Terminal") logger.info("Now expecting answer from Terminal")
if self.get_one_byte_answer('ENQ'): if self.get_one_byte_answer('ENQ'):
self.send_one_byte_signal('ACK') self.send_one_byte_signal('ACK')
answer_data = self.get_answer_from_terminal(data)
self.get_answer_from_terminal(data)
self.send_one_byte_signal('ACK') self.send_one_byte_signal('ACK')
if self.get_one_byte_answer('EOT'): if self.get_one_byte_answer('EOT'):
logger.info("Answer received from Terminal") logger.info("Answer received from Terminal")

219
hw_telium_payment_terminal/test-scripts/telium-test.py

@ -0,0 +1,219 @@
#! /usr/bin/python
# -*- encoding: utf-8 -*-
##############################################################################
#
# Hardware Telium Test script
# Copyright (C) 2014 Akretion (http://www.akretion.com)
# @author Alexis de Lattre <alexis.delattre@akretion.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
from serial import Serial
import curses.ascii
import time
import pycountry
DEVICE = '/dev/ttyACM0'
DEVICE_RATE = 9600
PAYMENT_MODE = 'card' # 'card' ou 'check'
CURRENCY_ISO = 'EUR'
AMOUNT = 12.42
def serial_write(serial, text):
assert isinstance(text, str), 'text must be a string'
serial.write(text)
def initialize_msg(serial):
max_attempt = 3
attempt_nr = 0
while attempt_nr < max_attempt:
attempt_nr += 1
send_one_byte_signal(serial, 'ENQ')
if get_one_byte_answer(serial, 'ACK'):
return True
else:
print "Terminal : SAME PLAYER TRY AGAIN"
send_one_byte_signal(serial, 'EOT')
# Wait 1 sec between each attempt
time.sleep(1)
return False
def send_one_byte_signal(serial, signal):
ascii_names = curses.ascii.controlnames
assert signal in ascii_names, 'Wrong signal'
char = ascii_names.index(signal)
serial_write(serial, chr(char))
print 'Signal %s sent to terminal' % signal
def get_one_byte_answer(serial, expected_signal):
ascii_names = curses.ascii.controlnames
one_byte_read = serial.read(1)
expected_char = ascii_names.index(expected_signal)
if one_byte_read == chr(expected_char):
print "%s received from terminal" % expected_signal
return True
else:
return False
def prepare_data_to_send():
if PAYMENT_MODE == 'check':
payment_mode = 'C'
elif PAYMENT_MODE == 'card':
payment_mode = '1'
else:
print "The payment mode '%s' is not supported" % PAYMENT_MODE
return False
cur_iso_letter = CURRENCY_ISO.upper()
try:
cur = pycountry.currencies.get(letter=cur_iso_letter)
cur_numeric = str(cur.numeric)
except:
print "Currency %s is not recognized" % cur_iso_letter
return False
data = {
'pos_number': str(1).zfill(2),
'answer_flag': '0',
'transaction_type': '0',
'payment_mode': payment_mode,
'currency_numeric': cur_numeric.zfill(3),
'private': ' ' * 10,
'delay': 'A011',
'auto': 'B010',
'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
}
return data
def generate_lrc(real_msg_with_etx):
lrc = 0
for char in real_msg_with_etx:
lrc ^= ord(char)
return lrc
def send_message(serial, data):
'''We use protocol E+'''
ascii_names = curses.ascii.controlnames
real_msg = (
data['pos_number']
+ data['amount_msg']
+ data['answer_flag']
+ data['payment_mode']
+ data['transaction_type']
+ data['currency_numeric']
+ data['private']
+ data['delay']
+ data['auto']
)
print 'Real message to send = %s' % real_msg
assert len(real_msg) == 34, 'Wrong length for protocol E+'
real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
lrc = generate_lrc(real_msg_with_etx)
message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
serial_write(serial, message)
print 'Message sent to terminal'
def compare_data_vs_answer(data, answer_data):
for field in [
'pos_number', 'amount_msg',
'currency_numeric', 'private']:
if data[field] != answer_data[field]:
print (
"Field %s has value '%s' in data and value '%s' in answer"
% (field, data[field], answer_data[field]))
def parse_terminal_answer(real_msg, data):
answer_data = {
'pos_number': real_msg[0:2],
'transaction_result': real_msg[2],
'amount_msg': real_msg[3:11],
'payment_mode': real_msg[11],
'currency_numeric': real_msg[12:15],
'private': real_msg[15:26],
}
print 'answer_data = %s' % answer_data
compare_data_vs_answer(data, answer_data)
return answer_data
def get_answer_from_terminal(serial, data):
ascii_names = curses.ascii.controlnames
full_msg_size = 1+2+1+8+1+3+10+1+1
msg = serial.read(size=full_msg_size)
print '%d bytes read from terminal' % full_msg_size
assert len(msg) == full_msg_size, 'Answer has a wrong size'
if msg[0] != chr(ascii_names.index('STX')):
print 'The first byte of the answer from terminal should be STX'
if msg[-2] != chr(ascii_names.index('ETX')):
print 'The byte before final of the answer from terminal should be ETX'
lrc = msg[-1]
computed_lrc = chr(generate_lrc(msg[1:-1]))
if computed_lrc != lrc:
print 'The LRC of the answer from terminal is wrong'
real_msg = msg[1:-2]
print 'Real answer received = %s' % real_msg
return parse_terminal_answer(real_msg, data)
def transaction_start():
'''This function sends the data to the serial/usb port.
'''
serial = False
try:
print(
'Opening serial port %s for payment terminal with '
'baudrate %d' % (DEVICE, DEVICE_RATE))
# IMPORTANT : don't modify timeout=3 seconds
# This parameter is very important ; the Telium spec say
# that we have to wait to up 3 seconds to get LRC
serial = Serial(
DEVICE, DEVICE_RATE, timeout=3)
print 'serial.is_open = %s' % serial.isOpen()
if initialize_msg(serial):
data = prepare_data_to_send()
if not data:
return
send_message(serial, data)
if get_one_byte_answer(serial, 'ACK'):
send_one_byte_signal(serial, 'EOT')
print "Now expecting answer from Terminal"
if get_one_byte_answer(serial, 'ENQ'):
send_one_byte_signal(serial, 'ACK')
get_answer_from_terminal(serial, data)
send_one_byte_signal(serial, 'ACK')
if get_one_byte_answer(serial, 'EOT'):
print "Answer received from Terminal"
except Exception, e:
print 'Exception in serial connection: %s' % str(e)
finally:
if serial:
print 'Closing serial port for payment terminal'
serial.close()
if __name__ == '__main__':
transaction_start()
Loading…
Cancel
Save