Browse Source

Merge pull request #159 from akretion/10-better-agi-script

[10.0] Improve AGI script using a dedicated lib
pull/161/merge
Alexis de Lattre 7 years ago
committed by GitHub
parent
commit
e929a6b2bc
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 116
      asterisk_click2dial/scripts/set_name_agi.py

116
asterisk_click2dial/scripts/set_name_agi.py

@ -1,6 +1,6 @@
#! /usr/bin/python #! /usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# © 2010-2016 Akretion (Alexis de Lattre <alexis.delattre@akretion.com>)
# © 2010-2018 Akretion (Alexis de Lattre <alexis.delattre@akretion.com>)
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as # it under the terms of the GNU Affero General Public License as
@ -95,9 +95,10 @@
import xmlrpclib import xmlrpclib
import sys import sys
from optparse import OptionParser from optparse import OptionParser
from asterisk import agi as agilib # pip install pyst2
__author__ = "Alexis de Lattre <alexis.delattre@akretion.com>" __author__ = "Alexis de Lattre <alexis.delattre@akretion.com>"
__date__ = "June 2015"
__date__ = "February 2018"
__version__ = "0.6" __version__ = "0.6"
# Name that will be displayed if there is no match # Name that will be displayed if there is no match
@ -178,27 +179,9 @@ options = [
] ]
def stdout_write(string):
'''Wrapper on sys.stdout.write'''
sys.stdout.write(string.encode(sys.stdout.encoding or 'utf-8', 'replace'))
sys.stdout.flush()
# When we output a command, we get an answer "200 result=1" on stdin
# Purge stdin to avoid these Asterisk error messages :
# utils.c ast_carefulwrite: write() returned error: Broken pipe
sys.stdin.readline()
return True
def stderr_write(string):
'''Wrapper on sys.stderr.write'''
sys.stderr.write(string.encode(sys.stdout.encoding or 'utf-8', 'replace'))
sys.stdout.flush()
return True
def geolocate_phone_number(number, my_country_code, lang): def geolocate_phone_number(number, my_country_code, lang):
import phonenumbers import phonenumbers
import phonenumbers.geocoder
import phonenumbers.geocoder # Takes an enormous amount of time...
res = '' res = ''
phonenum = phonenumbers.parse(number, my_country_code.upper()) phonenum = phonenumbers.parse(number, my_country_code.upper())
city = phonenumbers.geocoder.description_for_number(phonenum, lang.lower()) city = phonenumbers.geocoder.description_for_number(phonenum, lang.lower())
@ -237,69 +220,44 @@ def main(options, arguments):
# print 'options = %s' % options # print 'options = %s' % options
# print 'arguments = %s' % arguments # print 'arguments = %s' % arguments
# AGI passes parameters to the script on standard input
stdinput = {}
while 1:
input_line = sys.stdin.readline()
if not input_line:
break
line = input_line.strip()
try:
variable, value = line.split(':')
except:
break
if variable[:4] != 'agi_': # All AGI parameters start with 'agi_'
stderr_write("bad stdin variable : %s\n" % variable)
continue
variable = variable.strip()
value = value.strip()
if variable and value:
stdinput[variable] = value
stderr_write("full AGI environnement :\n")
for variable in stdinput.keys():
stderr_write("%s = %s\n" % (variable, stdinput.get(variable)))
agi = agilib.AGI()
if options.outgoing: if options.outgoing:
phone_number = stdinput.get('agi_%s' % options.outgoing_agi_var)
stdout_write('VERBOSE "Dialed phone number is %s"\n' % phone_number)
phone_number = agi.env['agi_%s' % options.outgoing_agi_var]
agi.verbose("Dialed phone number is %s" % phone_number)
else: else:
# If we already have a "True" caller ID name # If we already have a "True" caller ID name
# i.e. not just digits, but a real name, then we don't try to # i.e. not just digits, but a real name, then we don't try to
# connect to Odoo or geoloc, we just keep it # connect to Odoo or geoloc, we just keep it
if ( if (
stdinput.get('agi_calleridname') and
not stdinput.get('agi_calleridname').isdigit() and
stdinput.get('agi_calleridname').lower()
agi.env.get('agi_calleridname') and
not agi.env['agi_calleridname'].isdigit() and
agi.env['agi_calleridname'].lower()
not in ['asterisk', 'unknown', 'anonymous'] and not in ['asterisk', 'unknown', 'anonymous'] and
not options.notify): not options.notify):
stdout_write(
'VERBOSE "Incoming CallerID name is %s"\n'
% stdinput.get('agi_calleridname'))
stdout_write(
'VERBOSE "As it is a real name, we do not change it"\n')
agi.verbose(
"Incoming CallerID name is %s"
% agi.env['agi_calleridname'])
agi.verbose("As it is a real name, we do not change it")
return True return True
phone_number = stdinput.get('agi_callerid')
stderr_write('stdout encoding = %s\n' % sys.stdout.encoding or 'utf-8')
phone_number = agi.env['agi_callerid']
if not isinstance(phone_number, str): if not isinstance(phone_number, str):
stdout_write('VERBOSE "Phone number is empty"\n')
agi.verbose("Phone number is empty")
exit(0) exit(0)
# Match for particular cases and anonymous phone calls # Match for particular cases and anonymous phone calls
# To test anonymous call in France, dial 3651 + number # To test anonymous call in France, dial 3651 + number
if not phone_number.isdigit(): if not phone_number.isdigit():
stdout_write(
'VERBOSE "Phone number (%s) is not a digit"\n' % phone_number)
agi.verbose("Phone number (%s) is not a digit" % phone_number)
exit(0) exit(0)
stdout_write('VERBOSE "Phone number = %s"\n' % phone_number)
agi.verbose("Phone number = %s" % phone_number)
if options.notify and not arguments: if options.notify and not arguments:
stdout_write(
'VERBOSE "When using the notify option, you must give arguments '
'to the script"\n')
agi.verbose(
"When using the notify option, you must give arguments "
"to the script")
exit(0) exit(0)
if options.notify: if options.notify:
@ -312,9 +270,8 @@ def main(options, arguments):
if options.server and options.jsonrpc: if options.server and options.jsonrpc:
import odoorpc import odoorpc
proto = options.ssl and 'jsonrpc+ssl' or 'jsonrpc' proto = options.ssl and 'jsonrpc+ssl' or 'jsonrpc'
stdout_write(
'VERBOSE "Starting %s request on Odoo %s:%d database '
'%s username %s"\n' % (
agi.verbose(
"Starting %s request on Odoo %s:%d database %s username %s" % (
proto.upper(), options.server, options.port, options.database, proto.upper(), options.server, options.port, options.database,
options.username)) options.username))
try: try:
@ -325,15 +282,14 @@ def main(options, arguments):
'phone.common', method, phone_number, arguments) 'phone.common', method, phone_number, arguments)
else: else:
res = odoo.execute('phone.common', method, phone_number) res = odoo.execute('phone.common', method, phone_number)
stdout_write('VERBOSE "Called method %s"\n' % method)
agi.verbose("Called method %s" % method)
except: except:
stdout_write(
'VERBOSE "Could not connect to Odoo in JSON-RPC"\n')
agi.verbose("Could not connect to Odoo in JSON-RPC")
elif options.server: elif options.server:
proto = options.ssl and 'https' or 'http' proto = options.ssl and 'https' or 'http'
stdout_write(
'VERBOSE "Starting %s XML-RPC request on Odoo %s:%d '
'database %s user ID %d"\n' % (
agi.verbose(
"Starting %s XML-RPC request on Odoo %s:%d "
"database %s user ID %d" % (
proto, options.server, options.port, options.database, proto, options.server, options.port, options.database,
options.userid)) options.userid))
sock = xmlrpclib.ServerProxy( sock = xmlrpclib.ServerProxy(
@ -348,9 +304,9 @@ def main(options, arguments):
res = sock.execute( res = sock.execute(
options.database, options.userid, options.password, options.database, options.userid, options.password,
'phone.common', method, phone_number) 'phone.common', method, phone_number)
stdout_write('VERBOSE "Called method %s"\n' % method)
agi.verbose("Called method %s" % method)
except: except:
stdout_write('VERBOSE "Could not connect to Odoo in XML-RPC"\n')
agi.verbose("Could not connect to Odoo in XML-RPC")
# To simulate a long execution of the XML-RPC request # To simulate a long execution of the XML-RPC request
# import time # import time
# time.sleep(5) # time.sleep(5)
@ -361,15 +317,15 @@ def main(options, arguments):
res = res[0:options.max_size] res = res[0:options.max_size]
elif options.geoloc: elif options.geoloc:
# if the number is not found in Odoo, we try to geolocate # if the number is not found in Odoo, we try to geolocate
stdout_write(
'VERBOSE "Trying to geolocate with country %s and lang %s"\n'
agi.verbose(
"Trying to geolocate with country %s and lang %s"
% (options.country, options.lang)) % (options.country, options.lang))
res = geolocate_phone_number( res = geolocate_phone_number(
phone_number, options.country, options.lang) phone_number, options.country, options.lang)
else: else:
# if the number is not found in Odoo and geoloc is off, # if the number is not found in Odoo and geoloc is off,
# we put 'not_found_name' as Name # we put 'not_found_name' as Name
stdout_write('VERBOSE "Phone number not found in Odoo"\n')
agi.verbose("Phone number not found in Odoo")
res = not_found_name res = not_found_name
# All SIP phones should support UTF-8... # All SIP phones should support UTF-8...
@ -378,12 +334,12 @@ def main(options, arguments):
if options.ascii: if options.ascii:
res = convert_to_ascii(res) res = convert_to_ascii(res)
stdout_write('VERBOSE "Name = %s"\n' % res)
agi.verbose("Name = %s" % res)
if res: if res:
if options.outgoing: if options.outgoing:
stdout_write('SET VARIABLE connectedlinename "%s"\n' % res)
agi.set_variable('connectedlinename', res)
else: else:
stdout_write('SET CALLERID "%s"<%s>\n' % (res, phone_number))
agi.set_callerid('"%s"<%s>' % (res, phone_number))
return True return True

Loading…
Cancel
Save