diff --git a/hw_customer_display/__init__.py b/hw_customer_display/__init__.py deleted file mode 100644 index 0914de44..00000000 --- a/hw_customer_display/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Customer Display module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -from . import controllers diff --git a/hw_customer_display/__manifest__.py b/hw_customer_display/__manifest__.py deleted file mode 100644 index 299bab17..00000000 --- a/hw_customer_display/__manifest__.py +++ /dev/null @@ -1,86 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Customer Display module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -{ - 'name': 'Hardware Customer Display', - 'version': '8.0.0.1.0', - 'category': 'Hardware Drivers', - 'license': 'AGPL-3', - 'summary': 'Adds support for Customer Display in the Point of Sale', - 'description': """ -Hardware Customer Display -========================= - -This module adds support for Customer Display in the Point of Sale. -This module is designed to be installed on the *POSbox* (i.e. the -proxy on which the USB devices are connected) and not on the main -Odoo server. On the main Odoo server, you should install the module -*pos_customer_display*. - -The configuration of the hardware is done in the configuration file of -the Odoo server of the POSbox. You should add the following entries in -the configuration file: - -* customer_display_device_name (default = /dev/ttyUSB0) -* customer_display_device_rate (default = 9600) -* customer_display_device_timeout (default = 2 seconds) - -The number of cols of the Customer Display (usually 20) should be -configured on the main Odoo server, in the menu Point of Sale > -Configuration > Point of Sales. The number of rows is supposed to be 2. - -It should support most serial and USB-serial LCD displays out-of-the-box -or with inheritance of a few functions. - -It has been tested with: - -* Bixolon BCD-1100 (Datasheet : - http://www.bixolon.com/html/en/product/product_detail.xhtml?prod_id=61) -* Bixolon BCD-1000 - -To setup the BCD-1100 on Linux, you will find some technical instructions -on this page: -http://techtuxwords.blogspot.fr/2012/12/linux-and-bixolon-bcd-1100.html - -If you have a kernel >= 3.12, you should also read this: -http://www.leniwiec.org/en/2014/06/25/ubuntu-14-04lts-how-to-pass-id-ven -dor-and-id-product-to-ftdi_sio-driver/ - -This module has been developped during a POS code sprint at Akretion -France from July 7th to July 10th 2014. This module is part of the POS -project of the Odoo Community Association http://odoo-community.org/. -You are invited to become a member and/or get involved in the -Association ! - -This module has been written by Alexis de Lattre from Akretion -. - """, - 'author': "Akretion,Odoo Community Association (OCA)", - 'website': 'http://www.akretion.com', - 'depends': ['hw_proxy'], - 'external_dependencies': { - 'python': ['serial', 'unidecode'], - }, - 'data': [], - 'installable': False, -} diff --git a/hw_customer_display/controllers/__init__.py b/hw_customer_display/controllers/__init__.py deleted file mode 100644 index 7ae94c9e..00000000 --- a/hw_customer_display/controllers/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Customer Display module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -from . import main diff --git a/hw_customer_display/controllers/main.py b/hw_customer_display/controllers/main.py deleted file mode 100644 index e2184f7b..00000000 --- a/hw_customer_display/controllers/main.py +++ /dev/null @@ -1,180 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Customer Display module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -import logging -import simplejson -import time -from threading import Thread, Lock -from Queue import Queue -from unidecode import unidecode -from serial import Serial -import openerp.addons.hw_proxy.controllers.main as hw_proxy -from openerp import http -from openerp.tools.config import config - - -logger = logging.getLogger(__name__) - - -class CustomerDisplayDriver(Thread): - def __init__(self): - Thread.__init__(self) - self.queue = Queue() - self.lock = Lock() - self.status = {'status': 'connecting', 'messages': []} - self.device_name = config.get( - 'customer_display_device_name', '/dev/ttyUSB0') - self.device_rate = int(config.get( - 'customer_display_device_rate', 9600)) - self.device_timeout = int(config.get( - 'customer_display_device_timeout', 2)) - self.serial = False - - def get_status(self): - self.push_task('status') - return self.status - - def set_status(self, status, message=None): - if status == self.status['status']: - if message is not None and message != self.status['messages'][-1]: - self.status['messages'].append(message) - else: - self.status['status'] = status - if message: - self.status['messages'] = [message] - else: - self.status['messages'] = [] - - if status == 'error' and message: - logger.error('Display Error: '+message) - elif status == 'disconnected' and message: - logger.warning('Disconnected Display: '+message) - - def lockedstart(self): - with self.lock: - if not self.isAlive(): - self.daemon = True - self.start() - - def push_task(self, task, data=None): - self.lockedstart() - self.queue.put((time.time(), task, data)) - - def move_cursor(self, col, row): - # Bixolon spec : 11. "Move Cursor to Specified Position" - self.cmd_serial_write('\x1B\x6C' + chr(col) + chr(row)) - - def display_text(self, lines): - logger.debug( - "Preparing to send the following lines to LCD: %s" % lines) - # We don't check the number of rows/cols here, because it has already - # been checked in the POS client in the JS code - lines_ascii = [] - for line in lines: - lines_ascii.append(unidecode(line)) - row = 0 - for dline in lines_ascii: - row += 1 - self.move_cursor(1, row) - self.serial_write(dline) - - def setup_customer_display(self): - '''Set LCD cursor to off - If your LCD has different setup instruction(s), you should - inherit this function''' - # Bixolon spec : 35. "Set Cursor On/Off" - self.cmd_serial_write('\x1F\x43\x00') - logger.debug('LCD cursor set to off') - - def clear_customer_display(self): - '''If your LCD has different clearing instruction, you should inherit - this function''' - # Bixolon spec : 12. "Clear Display Screen and Clear String Mode" - self.cmd_serial_write('\x0C') - logger.debug('Customer display cleared') - - def cmd_serial_write(self, command): - '''If your LCD requires a prefix and/or suffix on all commands, - you should inherit this function''' - assert isinstance(command, str), 'command must be a string' - self.serial_write(command) - - def serial_write(self, text): - assert isinstance(text, str), 'text must be a string' - self.serial.write(text) - - def send_text_customer_display(self, text_to_display): - '''This function sends the data to the serial/usb port. - We open and close the serial connection on every message display. - Why ? - 1. Because it is not a problem for the customer display - 2. Because it is not a problem for performance, according to my tests - 3. Because it allows recovery on errors : you can unplug/replug the - customer display and it will work again on the next message without - problem - ''' - lines = simplejson.loads(text_to_display) - assert isinstance(lines, list), 'lines_list should be a list' - try: - logger.debug( - 'Opening serial port %s for customer display with baudrate %d' - % (self.device_name, self.device_rate)) - self.serial = Serial( - self.device_name, self.device_rate, - timeout=self.device_timeout) - logger.debug('serial.is_open = %s' % self.serial.isOpen()) - self.setup_customer_display() - self.clear_customer_display() - self.display_text(lines) - except Exception, e: - logger.error('Exception in serial connection: %s' % str(e)) - finally: - if self.serial: - logger.debug('Closing serial port for customer display') - self.serial.close() - - def run(self): - while True: - try: - timestamp, task, data = self.queue.get(True) - if task == 'display': - self.send_text_customer_display(data) - elif task == 'status': - pass - except Exception as e: - self.set_status('error', str(e)) - -driver = CustomerDisplayDriver() - -hw_proxy.drivers['customer_display'] = driver - - -class CustomerDisplayProxy(hw_proxy.Proxy): - @http.route( - '/hw_proxy/send_text_customer_display', type='json', auth='none', - cors='*') - def send_text_customer_display(self, text_to_display): - logger.debug( - 'LCD: Call send_text_customer_display with text=%s', - text_to_display) - driver.push_task('display', text_to_display) diff --git a/hw_customer_display/test-scripts/customer-display-test.py b/hw_customer_display/test-scripts/customer-display-test.py deleted file mode 100755 index ad3a50d4..00000000 --- a/hw_customer_display/test-scripts/customer-display-test.py +++ /dev/null @@ -1,68 +0,0 @@ -#! /usr/bin/python -# -*- encoding: utf-8 -*- -# Author : Alexis de Lattre -# The licence is in the file __openerp__.py -# This is a test script, that you can use if you want to test/play -# with the customer display independantly from the Odoo server -# It has been tested with a Bixolon BCD-1100 - -from serial import Serial -from unidecode import unidecode -import sys - -DEVICE = '/dev/ttyUSB0' -DEVICE_RATE = 9600 -DEVICE_COLS = 20 - - -def display_text(ser, line1, line2): - print "convert to ascii" - line1 = unidecode(line1) - line2 = unidecode(line2) - print "set lines to the right lenght (%s)" % DEVICE_COLS - for line in [line1, line2]: - if len(line) < DEVICE_COLS: - line += ' ' * (DEVICE_COLS - len(line)) - elif len(line) > DEVICE_COLS: - line = line[0:DEVICE_COLS] - assert len(line) == DEVICE_COLS, 'Wrong length' - print "try to clear display" - ser.write('\x0C') - print "clear done" - print "try to position at start of 1st line" - ser.write('\x1B\x6C' + chr(1) + chr(1)) - print "position done" - print "try to write 1st line" - ser.write(line1) - print "write 1st line done" - print "try to position at start of 2nd line" - ser.write('\x1B\x6C' + chr(1) + chr(2)) - print "position done" - print "try to write 2nd line" - ser.write(line2) - print "write done" - - -def open_close_display(line1, line2): - ser = False - try: - print "open serial port" - ser = Serial(DEVICE, DEVICE_RATE, timeout=2) - print "serial port open =", ser.isOpen() - print "try to set cursor to off" - ser.write('\x1F\x43\x00') - print "cursor set to off" - display_text(ser, line1, line2) - except Exception, e: - print "EXCEPTION e=", e - sys.exit(1) - finally: - if ser: - print "close serial port" - ser.close() - - -if __name__ == '__main__': - line1 = u'POS Code Sprint' - line2 = u'@ Akretion 2014/07' - open_close_display(line1, line2) diff --git a/hw_telium_payment_terminal/__init__.py b/hw_telium_payment_terminal/__init__.py deleted file mode 100644 index c535b703..00000000 --- a/hw_telium_payment_terminal/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Telium Payment Terminal module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -from . import controllers diff --git a/hw_telium_payment_terminal/__manifest__.py b/hw_telium_payment_terminal/__manifest__.py deleted file mode 100644 index a5768a81..00000000 --- a/hw_telium_payment_terminal/__manifest__.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Telium Payment Terminal module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -{ - 'name': 'Hardware Telium Payment Terminal', - 'version': '8.0.0.1.0', - 'category': 'Hardware Drivers', - 'license': 'AGPL-3', - 'summary': 'Adds support for Payment Terminals using Telium protocol', - 'description': """ -Hardware Telium Payment Terminal -================================ - -This module adds support for credit card reader and checks printers -using Telium protocol in the Point of Sale. This module is designed to -be installed on the *POSbox* (i.e. the proxy on which the USB devices -are connected) and not on the main Odoo server. On the main Odoo server, -you should install the module *pos_payment_terminal*. - -The configuration of the hardware is done in the configuration file of -the Odoo server of the POSbox. You should add the following entries in -the configuration file: - -* payment_terminal_device_name (default = /dev/ttyACM0) -* payment_terminal_device_rate (default = 9600) - -The Telium protocol is used by Ingenico and Sagem payment terminals. It -is based on the Concert protocol, so it can probably work with payment -terminals from other brands. This module implements the protocol E+ (and -not the protocol E), so it requires a Telium Manager version 37783600 -or superior. To get the version of the Telium Manager on an Ingenico -terminal press F > 0-TELIUM MANAGER > 2-Consultation > 4-Configuration -> 2-Software > 1-TERMINAL > On Display > Telium Manager and then read -the field *M20S*. - -You will need to configure your payment terminal to accept commands -from the POS. On an Ingenico terminal press F > 0-TELIUM MANAGER > -5-Initialization > 1-Parameters > Cash Connection and then select *On* -and then *USB*. After that, you should reboot the terminal. - -This module has been successfully tested with: - -* Ingenico EFTSmart4S -* Ingenico EFTSmart2 2640 with Telim Manager version 37784503 -* Ingenico i2200 cheque reader and writer - -This module has been developped during a POS code sprint at Akretion -France from July 7th to July 10th 2014. This module is part of the POS -project of the Odoo Community Association http://odoo-community.org/. -You are invited to become a member and/or get involved in the -Association ! - -This module has been written by Alexis de Lattre - from Akretion. - """, - 'author': "Akretion,Odoo Community Association (OCA)", - 'website': 'http://www.akretion.com', - 'depends': ['hw_proxy'], - 'external_dependencies': { - 'python': ['serial', 'pycountry'], - }, - 'data': [], - 'installable': False, -} diff --git a/hw_telium_payment_terminal/controllers/__init__.py b/hw_telium_payment_terminal/controllers/__init__.py deleted file mode 100644 index d91efc38..00000000 --- a/hw_telium_payment_terminal/controllers/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Telium Payment Terminal module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -from . import main diff --git a/hw_telium_payment_terminal/controllers/main.py b/hw_telium_payment_terminal/controllers/main.py deleted file mode 100644 index 563d2cc9..00000000 --- a/hw_telium_payment_terminal/controllers/main.py +++ /dev/null @@ -1,284 +0,0 @@ -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Telium Payment Terminal module for Odoo -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -import logging -import simplejson -import time -import curses.ascii -from threading import Thread, Lock -from Queue import Queue -from serial import Serial -import pycountry -import openerp.addons.hw_proxy.controllers.main as hw_proxy -from openerp import http -from openerp.tools.config import config - - -logger = logging.getLogger(__name__) - - -class TeliumPaymentTerminalDriver(Thread): - def __init__(self): - Thread.__init__(self) - self.queue = Queue() - self.lock = Lock() - self.status = {'status': 'connecting', 'messages': []} - self.device_name = config.get( - 'telium_terminal_device_name', '/dev/ttyACM0') - self.device_rate = int(config.get( - 'telium_terminal_device_rate', 9600)) - self.serial = False - - def get_status(self): - self.push_task('status') - return self.status - - def set_status(self, status, message=None): - if status == self.status['status']: - if message is not None and message != self.status['messages'][-1]: - self.status['messages'].append(message) - else: - self.status['status'] = status - if message: - self.status['messages'] = [message] - else: - self.status['messages'] = [] - - if status == 'error' and message: - logger.error('Payment Terminal Error: '+message) - elif status == 'disconnected' and message: - logger.warning('Disconnected Terminal: '+message) - - def lockedstart(self): - with self.lock: - if not self.isAlive(): - self.daemon = True - self.start() - - def push_task(self, task, data=None): - self.lockedstart() - self.queue.put((time.time(), task, data)) - - def serial_write(self, text): - assert isinstance(text, str), 'text must be a string' - self.serial.write(text) - - def initialize_msg(self): - max_attempt = 3 - attempt_nr = 0 - while attempt_nr < max_attempt: - attempt_nr += 1 - self.send_one_byte_signal('ENQ') - if self.get_one_byte_answer('ACK'): - return True - else: - logger.warning("Terminal : SAME PLAYER TRY AGAIN") - self.send_one_byte_signal('EOT') - # Wait 1 sec between each attempt - time.sleep(1) - return False - - def send_one_byte_signal(self, signal): - ascii_names = curses.ascii.controlnames - assert signal in ascii_names, 'Wrong signal' - char = ascii_names.index(signal) - self.serial_write(chr(char)) - logger.debug('Signal %s sent to terminal' % signal) - - def get_one_byte_answer(self, expected_signal): - ascii_names = curses.ascii.controlnames - one_byte_read = self.serial.read(1) - expected_char = ascii_names.index(expected_signal) - if one_byte_read == chr(expected_char): - logger.debug("%s received from terminal" % expected_signal) - return True - else: - return False - - def prepare_data_to_send(self, payment_info_dict): - amount = payment_info_dict['amount'] - if payment_info_dict['payment_mode'] == 'check': - payment_mode = 'C' - elif payment_info_dict['payment_mode'] == 'card': - payment_mode = '1' - else: - logger.error( - "The payment mode '%s' is not supported" - % payment_info_dict['payment_mode']) - return False - cur_iso_letter = payment_info_dict['currency_iso'].upper() - try: - cur = pycountry.currencies.get(letter=cur_iso_letter) - cur_numeric = str(cur.numeric) - except: - logger.error("Currency %s is not recognized" % cur_iso_letter) - return False - data = { - 'pos_number': str(1).zfill(2), - 'answer_flag': '0', - 'transaction_type': '0', - 'payment_mode': payment_mode, - 'currency_numeric': cur_numeric.zfill(3), - 'private': ' ' * 10, - 'delay': 'A011', - 'auto': 'B010', - 'amount_msg': ('%.0f' % (amount * 100)).zfill(8), - } - return data - - def generate_lrc(self, real_msg_with_etx): - lrc = 0 - for char in real_msg_with_etx: - lrc ^= ord(char) - return lrc - - def send_message(self, data): - '''We use protocol E+''' - ascii_names = curses.ascii.controlnames - real_msg = ( - data['pos_number'] + - data['amount_msg'] + - data['answer_flag'] + - data['payment_mode'] + - data['transaction_type'] + - data['currency_numeric'] + - data['private'] + - data['delay'] + - data['auto']) - logger.debug('Real message to send = %s' % real_msg) - assert len(real_msg) == 34, 'Wrong length for protocol E+' - real_msg_with_etx = real_msg + chr(ascii_names.index('ETX')) - lrc = self.generate_lrc(real_msg_with_etx) - message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc) - self.serial_write(message) - logger.info('Message sent to terminal') - - def compare_data_vs_answer(self, data, answer_data): - for field in [ - 'pos_number', 'amount_msg', - 'currency_numeric', 'private']: - if data[field] != answer_data[field]: - logger.warning( - "Field %s has value '%s' in data and value '%s' in answer" - % (field, data[field], answer_data[field])) - - def parse_terminal_answer(self, real_msg, data): - answer_data = { - 'pos_number': real_msg[0:2], - 'transaction_result': real_msg[2], - 'amount_msg': real_msg[3:11], - 'payment_mode': real_msg[11], - 'currency_numeric': real_msg[12:15], - 'private': real_msg[15:26], - } - logger.debug('answer_data = %s' % answer_data) - self.compare_data_vs_answer(data, answer_data) - return answer_data - - def get_answer_from_terminal(self, data): - ascii_names = curses.ascii.controlnames - full_msg_size = 1+2+1+8+1+3+10+1+1 - msg = self.serial.read(size=full_msg_size) - logger.debug('%d bytes read from terminal' % full_msg_size) - assert len(msg) == full_msg_size, 'Answer has a wrong size' - if msg[0] != chr(ascii_names.index('STX')): - logger.error( - 'The first byte of the answer from terminal should be STX') - if msg[-2] != chr(ascii_names.index('ETX')): - logger.error( - 'The byte before final of the answer from terminal ' - 'should be ETX') - lrc = msg[-1] - computed_lrc = chr(self.generate_lrc(msg[1:-1])) - if computed_lrc != lrc: - logger.error( - 'The LRC of the answer from terminal is wrong') - real_msg = msg[1:-2] - logger.debug('Real answer received = %s' % real_msg) - return self.parse_terminal_answer(real_msg, data) - - def transaction_start(self, payment_info): - '''This function sends the data to the serial/usb port. - ''' - payment_info_dict = simplejson.loads(payment_info) - assert isinstance(payment_info_dict, dict), \ - 'payment_info_dict should be a dict' - try: - logger.debug( - 'Opening serial port %s for payment terminal with baudrate %d' - % (self.device_name, self.device_rate)) - # IMPORTANT : don't modify timeout=3 seconds - # This parameter is very important ; the Telium spec say - # that we have to wait to up 3 seconds to get LRC - self.serial = Serial( - self.device_name, self.device_rate, - timeout=3) - logger.debug('serial.is_open = %s' % self.serial.isOpen()) - if self.initialize_msg(): - data = self.prepare_data_to_send(payment_info_dict) - if not data: - return - self.send_message(data) - if self.get_one_byte_answer('ACK'): - self.send_one_byte_signal('EOT') - - logger.info("Now expecting answer from Terminal") - if self.get_one_byte_answer('ENQ'): - self.send_one_byte_signal('ACK') - self.get_answer_from_terminal(data) - self.send_one_byte_signal('ACK') - if self.get_one_byte_answer('EOT'): - logger.info("Answer received from Terminal") - - except Exception, e: - logger.error('Exception in serial connection: %s' % str(e)) - finally: - if self.serial: - logger.debug('Closing serial port for payment terminal') - self.serial.close() - - def run(self): - while True: - try: - timestamp, task, data = self.queue.get(True) - if task == 'transaction_start': - self.transaction_start(data) - elif task == 'status': - pass - except Exception as e: - self.set_status('error', str(e)) - -driver = TeliumPaymentTerminalDriver() - -hw_proxy.drivers['telium_payment_terminal'] = driver - - -class TeliumPaymentTerminalProxy(hw_proxy.Proxy): - @http.route( - '/hw_proxy/payment_terminal_transaction_start', - type='json', auth='none', cors='*') - def payment_terminal_transaction_start(self, payment_info): - logger.debug( - 'Telium: Call payment_terminal_transaction_start with ' - 'payment_info=%s', payment_info) - driver.push_task('transaction_start', payment_info) diff --git a/hw_telium_payment_terminal/test-scripts/telium-test.py b/hw_telium_payment_terminal/test-scripts/telium-test.py deleted file mode 100755 index ee94e3cd..00000000 --- a/hw_telium_payment_terminal/test-scripts/telium-test.py +++ /dev/null @@ -1,218 +0,0 @@ -#! /usr/bin/python -# -*- encoding: utf-8 -*- -############################################################################## -# -# Hardware Telium Test script -# Copyright (C) 2014 Akretion (http://www.akretion.com) -# @author Alexis de Lattre -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . -# -############################################################################## - - -from serial import Serial -import curses.ascii -import time -import pycountry - - -DEVICE = '/dev/ttyACM0' -DEVICE_RATE = 9600 -PAYMENT_MODE = 'card' # 'card' ou 'check' -CURRENCY_ISO = 'EUR' -AMOUNT = 12.42 - - -def serial_write(serial, text): - assert isinstance(text, str), 'text must be a string' - serial.write(text) - - -def initialize_msg(serial): - max_attempt = 3 - attempt_nr = 0 - while attempt_nr < max_attempt: - attempt_nr += 1 - send_one_byte_signal(serial, 'ENQ') - if get_one_byte_answer(serial, 'ACK'): - return True - else: - print "Terminal : SAME PLAYER TRY AGAIN" - send_one_byte_signal(serial, 'EOT') - # Wait 1 sec between each attempt - time.sleep(1) - return False - - -def send_one_byte_signal(serial, signal): - ascii_names = curses.ascii.controlnames - assert signal in ascii_names, 'Wrong signal' - char = ascii_names.index(signal) - serial_write(serial, chr(char)) - print 'Signal %s sent to terminal' % signal - - -def get_one_byte_answer(serial, expected_signal): - ascii_names = curses.ascii.controlnames - one_byte_read = serial.read(1) - expected_char = ascii_names.index(expected_signal) - if one_byte_read == chr(expected_char): - print "%s received from terminal" % expected_signal - return True - else: - return False - - -def prepare_data_to_send(): - if PAYMENT_MODE == 'check': - payment_mode = 'C' - elif PAYMENT_MODE == 'card': - payment_mode = '1' - else: - print "The payment mode '%s' is not supported" % PAYMENT_MODE - return False - cur_iso_letter = CURRENCY_ISO.upper() - try: - cur = pycountry.currencies.get(letter=cur_iso_letter) - cur_numeric = str(cur.numeric) - except: - print "Currency %s is not recognized" % cur_iso_letter - return False - data = { - 'pos_number': str(1).zfill(2), - 'answer_flag': '0', - 'transaction_type': '0', - 'payment_mode': payment_mode, - 'currency_numeric': cur_numeric.zfill(3), - 'private': ' ' * 10, - 'delay': 'A011', - 'auto': 'B010', - 'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8), - } - return data - - -def generate_lrc(real_msg_with_etx): - lrc = 0 - for char in real_msg_with_etx: - lrc ^= ord(char) - return lrc - - -def send_message(serial, data): - '''We use protocol E+''' - ascii_names = curses.ascii.controlnames - real_msg = ( - data['pos_number'] + - data['amount_msg'] + - data['answer_flag'] + - data['payment_mode'] + - data['transaction_type'] + - data['currency_numeric'] + - data['private'] + - data['delay'] + - data['auto']) - print 'Real message to send = %s' % real_msg - assert len(real_msg) == 34, 'Wrong length for protocol E+' - real_msg_with_etx = real_msg + chr(ascii_names.index('ETX')) - lrc = generate_lrc(real_msg_with_etx) - message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc) - serial_write(serial, message) - print 'Message sent to terminal' - - -def compare_data_vs_answer(data, answer_data): - for field in [ - 'pos_number', 'amount_msg', - 'currency_numeric', 'private']: - if data[field] != answer_data[field]: - print ( - "Field %s has value '%s' in data and value '%s' in answer" - % (field, data[field], answer_data[field])) - - -def parse_terminal_answer(real_msg, data): - answer_data = { - 'pos_number': real_msg[0:2], - 'transaction_result': real_msg[2], - 'amount_msg': real_msg[3:11], - 'payment_mode': real_msg[11], - 'currency_numeric': real_msg[12:15], - 'private': real_msg[15:26], - } - print 'answer_data = %s' % answer_data - compare_data_vs_answer(data, answer_data) - return answer_data - - -def get_answer_from_terminal(serial, data): - ascii_names = curses.ascii.controlnames - full_msg_size = 1+2+1+8+1+3+10+1+1 - msg = serial.read(size=full_msg_size) - print '%d bytes read from terminal' % full_msg_size - assert len(msg) == full_msg_size, 'Answer has a wrong size' - if msg[0] != chr(ascii_names.index('STX')): - print 'The first byte of the answer from terminal should be STX' - if msg[-2] != chr(ascii_names.index('ETX')): - print 'The byte before final of the answer from terminal should be ETX' - lrc = msg[-1] - computed_lrc = chr(generate_lrc(msg[1:-1])) - if computed_lrc != lrc: - print 'The LRC of the answer from terminal is wrong' - real_msg = msg[1:-2] - print 'Real answer received = %s' % real_msg - return parse_terminal_answer(real_msg, data) - - -def transaction_start(): - '''This function sends the data to the serial/usb port. - ''' - serial = False - try: - print( - 'Opening serial port %s for payment terminal with ' - 'baudrate %d' % (DEVICE, DEVICE_RATE)) - # IMPORTANT : don't modify timeout=3 seconds - # This parameter is very important ; the Telium spec say - # that we have to wait to up 3 seconds to get LRC - serial = Serial( - DEVICE, DEVICE_RATE, timeout=3) - print 'serial.is_open = %s' % serial.isOpen() - if initialize_msg(serial): - data = prepare_data_to_send() - if not data: - return - send_message(serial, data) - if get_one_byte_answer(serial, 'ACK'): - send_one_byte_signal(serial, 'EOT') - - print "Now expecting answer from Terminal" - if get_one_byte_answer(serial, 'ENQ'): - send_one_byte_signal(serial, 'ACK') - get_answer_from_terminal(serial, data) - send_one_byte_signal(serial, 'ACK') - if get_one_byte_answer(serial, 'EOT'): - print "Answer received from Terminal" - - except Exception, e: - print 'Exception in serial connection: %s' % str(e) - finally: - if serial: - print 'Closing serial port for payment terminal' - serial.close() - - -if __name__ == '__main__': - transaction_start() diff --git a/requirements.txt b/requirements.txt index de510661..e69de29b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +0,0 @@ -unidecode -pyserial -pycountry