You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
7.1 KiB

10 years ago
  1. #! /usr/bin/python
  2. # -*- encoding: utf-8 -*-
  3. ##############################################################################
  4. #
  5. # Hardware Telium Test script
  6. # Copyright (C) 2014 Akretion (http://www.akretion.com)
  7. # @author Alexis de Lattre <alexis.delattre@akretion.com>
  8. #
  9. # This program is free software: you can redistribute it and/or modify
  10. # it under the terms of the GNU Affero General Public License as
  11. # published by the Free Software Foundation, either version 3 of the
  12. # License, or (at your option) any later version.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. ##############################################################################
  23. from serial import Serial
  24. import curses.ascii
  25. import time
  26. import pycountry
  27. DEVICE = '/dev/ttyACM0'
  28. DEVICE_RATE = 9600
  29. PAYMENT_MODE = 'card' # 'card' ou 'check'
  30. CURRENCY_ISO = 'EUR'
  31. AMOUNT = 12.42
  32. def serial_write(serial, text):
  33. assert isinstance(text, str), 'text must be a string'
  34. serial.write(text)
  35. def initialize_msg(serial):
  36. max_attempt = 3
  37. attempt_nr = 0
  38. while attempt_nr < max_attempt:
  39. attempt_nr += 1
  40. send_one_byte_signal(serial, 'ENQ')
  41. if get_one_byte_answer(serial, 'ACK'):
  42. return True
  43. else:
  44. print "Terminal : SAME PLAYER TRY AGAIN"
  45. send_one_byte_signal(serial, 'EOT')
  46. # Wait 1 sec between each attempt
  47. time.sleep(1)
  48. return False
  49. def send_one_byte_signal(serial, signal):
  50. ascii_names = curses.ascii.controlnames
  51. assert signal in ascii_names, 'Wrong signal'
  52. char = ascii_names.index(signal)
  53. serial_write(serial, chr(char))
  54. print 'Signal %s sent to terminal' % signal
  55. def get_one_byte_answer(serial, expected_signal):
  56. ascii_names = curses.ascii.controlnames
  57. one_byte_read = serial.read(1)
  58. expected_char = ascii_names.index(expected_signal)
  59. if one_byte_read == chr(expected_char):
  60. print "%s received from terminal" % expected_signal
  61. return True
  62. else:
  63. return False
  64. def prepare_data_to_send():
  65. if PAYMENT_MODE == 'check':
  66. payment_mode = 'C'
  67. elif PAYMENT_MODE == 'card':
  68. payment_mode = '1'
  69. else:
  70. print "The payment mode '%s' is not supported" % PAYMENT_MODE
  71. return False
  72. cur_iso_letter = CURRENCY_ISO.upper()
  73. try:
  74. cur = pycountry.currencies.get(alpha_3=cur_iso_letter)
  75. cur_numeric = str(cur.numeric)
  76. except:
  77. print "Currency %s is not recognized" % cur_iso_letter
  78. return False
  79. data = {
  80. 'pos_number': str(1).zfill(2),
  81. 'answer_flag': '0',
  82. 'transaction_type': '0',
  83. 'payment_mode': payment_mode,
  84. 'currency_numeric': cur_numeric.zfill(3),
  85. 'private': ' ' * 10,
  86. 'delay': 'A011',
  87. 'auto': 'B010',
  88. 'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
  89. }
  90. return data
  91. def generate_lrc(real_msg_with_etx):
  92. lrc = 0
  93. for char in real_msg_with_etx:
  94. lrc ^= ord(char)
  95. return lrc
  96. def send_message(serial, data):
  97. '''We use protocol E+'''
  98. ascii_names = curses.ascii.controlnames
  99. real_msg = (
  100. data['pos_number'] +
  101. data['amount_msg'] +
  102. data['answer_flag'] +
  103. data['payment_mode'] +
  104. data['transaction_type'] +
  105. data['currency_numeric'] +
  106. data['private'] +
  107. data['delay'] +
  108. data['auto'])
  109. print 'Real message to send = %s' % real_msg
  110. assert len(real_msg) == 34, 'Wrong length for protocol E+'
  111. real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
  112. lrc = generate_lrc(real_msg_with_etx)
  113. message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
  114. serial_write(serial, message)
  115. print 'Message sent to terminal'
  116. def compare_data_vs_answer(data, answer_data):
  117. for field in [
  118. 'pos_number', 'amount_msg',
  119. 'currency_numeric', 'private']:
  120. if data[field] != answer_data[field]:
  121. print (
  122. "Field %s has value '%s' in data and value '%s' in answer"
  123. % (field, data[field], answer_data[field]))
  124. def parse_terminal_answer(real_msg, data):
  125. answer_data = {
  126. 'pos_number': real_msg[0:2],
  127. 'transaction_result': real_msg[2],
  128. 'amount_msg': real_msg[3:11],
  129. 'payment_mode': real_msg[11],
  130. 'currency_numeric': real_msg[12:15],
  131. 'private': real_msg[15:26],
  132. }
  133. print 'answer_data = %s' % answer_data
  134. compare_data_vs_answer(data, answer_data)
  135. return answer_data
  136. def get_answer_from_terminal(serial, data):
  137. ascii_names = curses.ascii.controlnames
  138. full_msg_size = 1+2+1+8+1+3+10+1+1
  139. msg = serial.read(size=full_msg_size)
  140. print '%d bytes read from terminal' % full_msg_size
  141. assert len(msg) == full_msg_size, 'Answer has a wrong size'
  142. if msg[0] != chr(ascii_names.index('STX')):
  143. print 'The first byte of the answer from terminal should be STX'
  144. if msg[-2] != chr(ascii_names.index('ETX')):
  145. print 'The byte before final of the answer from terminal should be ETX'
  146. lrc = msg[-1]
  147. computed_lrc = chr(generate_lrc(msg[1:-1]))
  148. if computed_lrc != lrc:
  149. print 'The LRC of the answer from terminal is wrong'
  150. real_msg = msg[1:-2]
  151. print 'Real answer received = %s' % real_msg
  152. return parse_terminal_answer(real_msg, data)
  153. def transaction_start():
  154. '''This function sends the data to the serial/usb port.
  155. '''
  156. serial = False
  157. try:
  158. print(
  159. 'Opening serial port %s for payment terminal with '
  160. 'baudrate %d' % (DEVICE, DEVICE_RATE))
  161. # IMPORTANT : don't modify timeout=3 seconds
  162. # This parameter is very important ; the Telium spec say
  163. # that we have to wait to up 3 seconds to get LRC
  164. serial = Serial(
  165. DEVICE, DEVICE_RATE, timeout=3)
  166. print 'serial.is_open = %s' % serial.isOpen()
  167. if initialize_msg(serial):
  168. data = prepare_data_to_send()
  169. if not data:
  170. return
  171. send_message(serial, data)
  172. if get_one_byte_answer(serial, 'ACK'):
  173. send_one_byte_signal(serial, 'EOT')
  174. print "Now expecting answer from Terminal"
  175. if get_one_byte_answer(serial, 'ENQ'):
  176. send_one_byte_signal(serial, 'ACK')
  177. get_answer_from_terminal(serial, data)
  178. send_one_byte_signal(serial, 'ACK')
  179. if get_one_byte_answer(serial, 'EOT'):
  180. print "Answer received from Terminal"
  181. except Exception, e:
  182. print 'Exception in serial connection: %s' % str(e)
  183. finally:
  184. if serial:
  185. print 'Closing serial port for payment terminal'
  186. serial.close()
  187. if __name__ == '__main__':
  188. transaction_start()