You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.1 KiB

6 years ago
9 years ago
  1. #! /usr/bin/python
  2. # -*- encoding: utf-8 -*-
  3. from serial import Serial
  4. import curses.ascii
  5. import time
  6. import pycountry
  7. DEVICE = '/dev/ttyACM0'
  8. DEVICE_RATE = 9600
  9. PAYMENT_MODE = 'card' # 'card' or 'check'
  10. CURRENCY_ISO = 'EUR'
  11. AMOUNT = 12.42
  12. def serial_write(serial, text):
  13. assert isinstance(text, str), 'text must be a string'
  14. serial.write(text)
  15. def initialize_msg(serial):
  16. max_attempt = 3
  17. attempt_nr = 0
  18. while attempt_nr < max_attempt:
  19. attempt_nr += 1
  20. send_one_byte_signal(serial, 'ENQ')
  21. if get_one_byte_answer(serial, 'ACK'):
  22. return True
  23. else:
  24. print("Terminal : SAME PLAYER TRY AGAIN")
  25. send_one_byte_signal(serial, 'EOT')
  26. # Wait 1 sec between each attempt
  27. time.sleep(1)
  28. return False
  29. def send_one_byte_signal(serial, signal):
  30. ascii_names = curses.ascii.controlnames
  31. assert signal in ascii_names, 'Wrong signal'
  32. char = ascii_names.index(signal)
  33. serial_write(serial, chr(char))
  34. print('Signal %s sent to terminal' % signal)
  35. def get_one_byte_answer(serial, expected_signal):
  36. ascii_names = curses.ascii.controlnames
  37. one_byte_read = serial.read(1)
  38. expected_char = ascii_names.index(expected_signal)
  39. if one_byte_read == chr(expected_char):
  40. print("%s received from terminal" % expected_signal)
  41. return True
  42. else:
  43. return False
  44. def prepare_data_to_send():
  45. if PAYMENT_MODE == 'check':
  46. payment_mode = 'C'
  47. elif PAYMENT_MODE == 'card':
  48. payment_mode = '1'
  49. else:
  50. print("The payment mode '%s' is not supported" % PAYMENT_MODE)
  51. return False
  52. cur_iso_letter = CURRENCY_ISO.upper()
  53. try:
  54. cur = pycountry.currencies.get(alpha_3=cur_iso_letter)
  55. cur_numeric = str(cur.numeric)
  56. except:
  57. print("Currency %s is not recognized" % cur_iso_letter)
  58. return False
  59. data = {
  60. 'pos_number': str(1).zfill(2),
  61. 'answer_flag': '0',
  62. 'transaction_type': '0',
  63. 'payment_mode': payment_mode,
  64. 'currency_numeric': cur_numeric.zfill(3),
  65. 'private': ' ' * 10,
  66. 'delay': 'A011',
  67. 'auto': 'B010',
  68. 'amount_msg': ('%.0f' % (AMOUNT * 100)).zfill(8),
  69. }
  70. return data
  71. def generate_lrc(real_msg_with_etx):
  72. lrc = 0
  73. for char in real_msg_with_etx:
  74. lrc ^= ord(char)
  75. return lrc
  76. def send_message(serial, data):
  77. '''We use protocol E+'''
  78. ascii_names = curses.ascii.controlnames
  79. real_msg = (
  80. data['pos_number'] +
  81. data['amount_msg'] +
  82. data['answer_flag'] +
  83. data['payment_mode'] +
  84. data['transaction_type'] +
  85. data['currency_numeric'] +
  86. data['private'] +
  87. data['delay'] +
  88. data['auto'])
  89. print('Real message to send = %s' % real_msg)
  90. assert len(real_msg) == 34, 'Wrong length for protocol E+'
  91. real_msg_with_etx = real_msg + chr(ascii_names.index('ETX'))
  92. lrc = generate_lrc(real_msg_with_etx)
  93. message = chr(ascii_names.index('STX')) + real_msg_with_etx + chr(lrc)
  94. serial_write(serial, message)
  95. print('Message sent to terminal')
  96. def compare_data_vs_answer(data, answer_data):
  97. for field in [
  98. 'pos_number', 'amount_msg',
  99. 'currency_numeric', 'private']:
  100. if data[field] != answer_data[field]:
  101. print(
  102. "Field %s has value '%s' in data and value '%s' in answer"
  103. % (field, data[field], answer_data[field]))
  104. def parse_terminal_answer(real_msg, data):
  105. answer_data = {
  106. 'pos_number': real_msg[0:2],
  107. 'transaction_result': real_msg[2],
  108. 'amount_msg': real_msg[3:11],
  109. 'payment_mode': real_msg[11],
  110. 'currency_numeric': real_msg[12:15],
  111. 'private': real_msg[15:26],
  112. }
  113. print('answer_data = %s' % answer_data)
  114. compare_data_vs_answer(data, answer_data)
  115. return answer_data
  116. def get_answer_from_terminal(serial, data):
  117. ascii_names = curses.ascii.controlnames
  118. full_msg_size = 1+2+1+8+1+3+10+1+1
  119. msg = serial.read(size=full_msg_size)
  120. print('%d bytes read from terminal' % full_msg_size)
  121. assert len(msg) == full_msg_size, 'Answer has a wrong size'
  122. if msg[0] != chr(ascii_names.index('STX')):
  123. print('The first byte of the answer from terminal should be STX')
  124. if msg[-2] != chr(ascii_names.index('ETX')):
  125. print('The byte before final of the answer '
  126. 'from terminal should be ETX')
  127. lrc = msg[-1]
  128. computed_lrc = chr(generate_lrc(msg[1:-1]))
  129. if computed_lrc != lrc:
  130. print('The LRC of the answer from terminal is wrong')
  131. real_msg = msg[1:-2]
  132. print('Real answer received = %s' % real_msg)
  133. return parse_terminal_answer(real_msg, data)
  134. def transaction_start():
  135. '''This function sends the data to the serial/usb port.
  136. '''
  137. serial = False
  138. try:
  139. print(
  140. 'Opening serial port %s for payment terminal with '
  141. 'baudrate %d' % (DEVICE, DEVICE_RATE))
  142. # IMPORTANT : don't modify timeout=3 seconds
  143. # This parameter is very important ; the Telium spec say
  144. # that we have to wait to up 3 seconds to get LRC
  145. serial = Serial(
  146. DEVICE, DEVICE_RATE, timeout=3)
  147. print('serial.is_open = %s' % serial.isOpen())
  148. if initialize_msg(serial):
  149. data = prepare_data_to_send()
  150. if not data:
  151. return
  152. send_message(serial, data)
  153. if get_one_byte_answer(serial, 'ACK'):
  154. send_one_byte_signal(serial, 'EOT')
  155. print("Now expecting answer from Terminal")
  156. if get_one_byte_answer(serial, 'ENQ'):
  157. send_one_byte_signal(serial, 'ACK')
  158. get_answer_from_terminal(serial, data)
  159. send_one_byte_signal(serial, 'ACK')
  160. if get_one_byte_answer(serial, 'EOT'):
  161. print("Answer received from Terminal")
  162. except Exception as e:
  163. print('Exception in serial connection: %s' % str(e))
  164. finally:
  165. if serial:
  166. print('Closing serial port for payment terminal')
  167. serial.close()
  168. if __name__ == '__main__':
  169. transaction_start()