Commit 9a648dd3 authored by okhin's avatar okhin
Browse files

Improving the way everything is running

parent 25653f44
Pipeline #24 skipped
......@@ -8,17 +8,40 @@ import logging
import concurrent.futures
import asyncio
import re
from wsgiref.simple_server import make_server
import configparser
import argparse
from operator import itemgetter
from bottle import request, abort, Bottle, ServerAdapter, JSONPlugin
from bottle_sqlite import SQLitePlugin
import jwt
import websockets
import requests
from bottle import request, abort, Bottle, JSONPlugin
from bottle_sqlite import SQLitePlugin
config = ConfigParser()
application = app = Bottle(autojson=False)
app.install(SQLitePlugin(dbfile='call.db'))
app.install(JSONPlugin(json_dumps=lambda s: json.dumps(s, cls=PiphoneJSONEncoder)))
threads = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
db = sqlite3.connect('call.db')
logging.basicConfig(filename='app.log', level=logging.DEBUG)
app = Bottle(autojson=False)
running = False
ws = None
debug = True
# Loggers
handler = logging.FileHandler('app.log')
phone_logger = logging.getLogger('piphone')
phone_logger.addHandler(handler)
phone_logger.setLevel(logging.DEBUG)
ws_logger = logging.getLogger('asterisk')
ws_logger.addHandler(handler)
ws_logger.setLevel(logging.DEBUG)
bottle_logger = logging.getLogger('bottle')
bottle_logger.addHandler(handler)
bottle_logger.setLevel(logging.DEBUG)
class PiphoneJSONEncoder(json.JSONEncoder):
def default(self, obj):
......@@ -35,6 +58,56 @@ class PiphoneJSONEncoder(json.JSONEncoder):
else:
return json.JSONEncoder.default(self, obj)
def authenticated(f):
'''
We need a decorator to check if our query is authenticated.
We will store an API key and SECRET in ur database, the client
needs to have both of them.
He must then send us a JWT token with an API claim in the payload.
The JWT token must be encoded and signed with the SECRET. If the
token is bad, we return a 403.
'''
def wrapped(db, *args, **kwargs):
# Let's get the JWT token. It should be a params (from get or post or whatev')
bottle_logger.debug("Authentication: {}".format([':'.join([key, request.params[key]]) for key in request.params],))
if 'token' not in request.params:
bottle_logger.error("No token found in the params")
abort(401, "No token found in the query")
# We want the api id in the params to.
if 'api' not in request.params:
bottle_logger.error("No api id found in the params")
abort(401, "No api id found in the params")
# Now, let's get the token on our side
try:
results = db.execute('SELECT token FROM users WHERE api = ?', (request.params['api'],)).fetchall()
assert len(results) == 1
token = results[0][0]
auth_token = jwt.decode(request.params['token'], token)
assert auth_token['api'] == request.params['api']
for key in auth_token:
request.params[key] = auth_token[key]
except (jwt.exceptions.InvalidTokenError, AssertionError) as e:
bottle_logger.error("Access refused")
bottle_logger.exception(e)
abort(403, e)
except Exception as e:
abort(500, e)
return f(db, *args, **kwargs)
return wrapped
def connected(f):
'''
This is a decorator used to check if we are connected to a websocket or not. If we're not
We're returning 500 error.
'''
def wrapped(db, *args, **kwargs):
if isinstance(ws, websockets.client.WebSocketClientProtocol):
if ws.open == True:
return f(db, *args, **kwargs)
ws_logger.error("Websocket connection is closed")
abort(500, "Websocket isn't running")
return wrapped
def sanitize_phonenumber(number):
"""
This function is used to sanitize a phone number.
......@@ -49,14 +122,72 @@ def sanitize_phonenumber(number):
raise TypeError('{} is not a valid international number, it should start with 00')
return number
async def hold_bridge():
'''
We will create a bridge that will be used to park caller waiting for callee to join.
It will play a song in a loop.
'''
payload = {}
payload['app'] = 'piphone'
payload['api_key'] = 'piphone:passpiphone'
payload['mode'] = 'holding'
try:
r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh', data=payload)
r.raise_for_status()
payload['mohClass'] = 'piphone'
r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/moh', data=payload)
r.raise_for_status()
except Exception as e:
phone_logger.error("Cannot start 'on hold' bridge")
phone_logger.exception(e)
raise e
async def listen():
'''
Start listening on the websocket
'''
global running
global ws
ws = await websockets.connect('ws://185.34.33.12:8088/ari/events?app=piphone&api_key=piphone:passpiphone')
ws_logger.debug('Websocket connected: {}'.format(type(ws)))
await hold_bridges()
while running == True:
try:
event = await ws.recv()
# Let's call the applications function
await dispatch(json.loads(event))
except websockets.exceptions.ConnectionClosed as e:
ws_logger.warning("Connexion closed")
ws_logger.exception(e)
running = False
ws.close()
return
except Exception as e:
ws_logger.exception(e)
continue
ws.close()
async def dispatch(self, event):
"""
Let's work on our events. Parse them and do request on the ARI API. Event is
a dict loaded from JSON.
"""
ws_logger.debug('Event received: {}'.format(event,))
# Let's get the call ID, the call id isthe channel id minus the last -part.
if 'channel' not in event:
return
call_id = re.sub('-\d+$', '', event['channel']['id'])
call = Call.load(call_id, self.db)
call.event_handler(event)
class Call(object):
"""
This Class is used to manage operatiosn on a call, to print it and dump it.
"""
history = []
actions = {'Created': 'call_caller'
, 'ChannelStateChange': 'change'
, 'ChannelDtmfReceived': 'dtmf'}
, 'ChannelStateChange': 'change'
, 'ChannelDtmfReceived': 'dtmf'}
def __init__(self, caller, callee, owner, callid=None, db=None):
try:
......@@ -70,7 +201,7 @@ class Call(object):
else:
self.id = callid
except Exception as e:
logging.exception(e)
phone_logger.exception(e)
raise e
def url(self):
......@@ -82,7 +213,7 @@ class Call(object):
@classmethod
def load(cls, callid, db):
logging.debug("Loading call {}".format(callid,))
phone_logger.debug("Loading call {}".format(callid,))
try:
results = db.execute('SELECT caller, callee, owner, callid, history FROM calls WHERE callid = ?;', (callid,))
result = results.fetchone()
......@@ -92,14 +223,14 @@ class Call(object):
object.db = db
return object
except Exception as e:
logging.exception(e)
phone_logger.exception(e)
raise e
def update(self, new_state):
'''
Let's update the state of the call. new_state is a tuple in the form (newstate, timestamp,)
'''
logging.debug("Got a new state: {}".format(new_state,))
phone_logger.debug("Got a new state: {}".format(new_state,))
self.history.append(new_state)
self.save()
......@@ -127,17 +258,17 @@ class Call(object):
payload = {}
payload['app'] = 'piphone'
payload['api_key'] = ':'.join([login_ari, token_ari])
logging.debug('Stopping the playback currently running'.format(payload,))
phone_logger.debug('Stopping the playback currently running'.format(payload,))
# We're stoping the playback, with the same ID as the channel, to keep track of it
r = requests.delete('http://185.34.33.12:8088/ari/playbacks/{0}'.format(event['channel']['id']), data=payload)
# Now we're moving the channel to the MOH Bridge
payload['channel'] = event['channel']['id']
logging.debug('Moving call {} to the garage'.format(payload['channel']))
phone_logger.debug('Moving call {} to the garage'.format(payload['channel']))
r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/addChannel', data=payload)
# Next we need to originate a call to the other side
logging.info('Will now connect {} to {}'.format(self.caller, self.callee,))
phone_logger.info('Will now connect {} to {}'.format(self.caller, self.callee,))
payload['endpoint'] = 'SIP/' + sanitize_phonenumber(self.callee) + '@forfait-kwaoo'
logging.debug('Preparing to send a request to the ARI with payload {}'.format(payload,))
phone_logger.debug('Preparing to send a request to the ARI with payload {}'.format(payload,))
r = requests.post('http://185.34.33.12:8088/ari/channels/{}-{}'.format(self.id, sanitize_phonenumber(self.callee),), data=payload)
except AssertionError as e:
logging.error("Received a DTMF sequence out le being in a '{}' state, ignoring: {}".format(self.state(), event['digit']))
......@@ -147,7 +278,7 @@ class Call(object):
Let's change the state of the call
'''
self.update((':'.join([event['channel']['state'], event['channel']['id'].split('-')[-1]]), event['timestamp'],))
logging.info("New state for call {}: {}".format(event['channel']['id'], event['channel']['state']))
phone_logger.info("New state for call {}: {}".format(event['channel']['id'], event['channel']['state']))
# We now need to take action according to our new state
results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,))
login_ari, token_ari = results.fetchone()
......@@ -159,13 +290,13 @@ class Call(object):
payload['app'] = 'piphone'
payload['api_key'] = ':'.join([login_ari, token_ari])
payload['type'] = 'mixing'
logging.debug("Creating a bridges to connect {} to {}".format(self.caller, self.callee,))
phone_logger.debug("Creating a bridges to connect {} to {}".format(self.caller, self.callee,))
r = requests.post('http://185.34.33.12:8088/ari/bridges/{}'.format(self.id), data=payload)
# Step 2, moving channels
payload['channel'] = ",".join([self.id + '-' + self.caller, self.id + '-' + self.callee])
logging.debug("Moving channels to the created bridge: {}".format(payload['channel']))
phone_logger.debug("Moving channels to the created bridge: {}".format(payload['channel']))
r = requests.post('http://185.34.33.12:8088/ari/bridges/{}/addChannel'.format(self.id), data=payload)
logging.info("Call now fully connected: {} <-> {}".format(self.caller, self.callee))
phone_logger.info("Call now fully connected: {} <-> {}".format(self.caller, self.callee))
return
# Call is being picked up, we want to play a song
payload = {}
......@@ -220,207 +351,95 @@ class Call(object):
'''
Save the Call to database.
'''
logging.debug("Saving call {}: {}".format(self.id, json.dumps(self, cls=PiphoneJSONEncoder)))
phone_logger.debug("Saving call {}: {}".format(self.id, json.dumps(self, cls=PiphoneJSONEncoder)))
try:
self.db.execute('''INSERT OR REPLACE INTO calls (caller, callee, owner, callid, history)
VALUES (?, ?, ?, ?, ?) '''
, (self.caller, self.callee, self.owner, self.id, json.dumps(self.history)))
self.db.commit()
except Exception as e:
logging.exception(e)
phone_logger.exception(e)
raise e
def authenticated(f):
'''
We need a decorator to check if our query is authenticated.
We will store an API key and SECRET in ur database, the client
needs to have both of them.
He must then send us a JWT token with an API claim in the payload.
The JWT token must be encoded and signed with the SECRET. If the
token is bad, we return a 403.
'''
def wrapped(db, *args, **kwargs):
# Let's get the JWT token. It should be a params (from get or post or whatev')
logging.debug("Authetication: {}".format([':'.join([key, request.params[key]]) for key in request.params],))
if 'token' not in request.params:
logging.error("No token found in the params")
abort(401, "No token found in the query")
# We want the api id in the params to.
if 'api' not in request.params:
logging.error("No api id found in the params")
abort(401, "No api id found in the params")
# Now, let's get the token on our side
try:
results = db.execute('SELECT token FROM users WHERE api = ?', (request.params['api'],)).fetchall()
assert len(results) == 1
token = results[0][0]
auth_token = jwt.decode(request.params['token'], token)
assert auth_token['api'] == request.params['api']
for key in auth_token:
request.params[key] = auth_token[key]
except (jwt.exceptions.InvalidTokenError, AssertionError) as e:
logging.error("Access refused")
logging.exception(e)
abort(403, e)
except Exception as e:
abort(500, e)
return f(db, *args, **kwargs)
return wrapped
class Server(ServerAdapter):
"""
This class manage all that is needed to run the REST App of the piphone. It is in charge of
launching an Executor, start the event loop in its own thread and run the bottle app.
"""
server = None
def stop():
global running
running = False
options = None
callbacks = {}
def __init__(self, debug=False, *args, **kwargs):
"""
We're starting the logger subsytem, create the Thread Pool and stuff
"""
self.logger = logging.getLogger('server')
if debug:
self.logger.setLevel(logging.DEBUG)
self.logger.addHandler(logging.FileHandler('app.log'))
self.threads = concurrent.futures.ThreadPoolExecutor()
self.loop = asyncio.get_event_loop()
self.db = sqlite3.connect('call.db')
super(Server, self).__init__(*args, **kwargs)
async def __listen(self):
ws_logger = logging.getLogger('websockets')
ws_logger.setLevel(logging.DEBUG)
ws_logger.addHandler(logging.FileHandler('app.log'))
async with websockets.connect('ws://185.34.33.12:8088/ari/events?app=piphone&api_key=piphone:passpiphone') as websocket:
await self.hold_bridges()
while self.running == True:
ws_logger.info("Waiting for events on websocket")
event = await websocket.recv()
# Let's call the applications function
await self.dispatch(json.loads(event))
async def dispatch(self, event):
"""
Let's work on our events. Parse them and do request on the ARI API. Event is
a dict loaded from JSON.
"""
self.logger.debug('Event received: {}'.format(event,))
# Let's get the call ID, the call id isthe channel id minus the last -part.
if 'channel' not in event:
self.logger.debug('Not a channel event, skip')
return
call_id = re.sub('-\d+$', '', event['channel']['id'])
call = Call.load(call_id, self.db)
call.event_handler(event)
async def hold_bridges(self):
'''
We will create a bridge that will be used to park caller waiting for callee to join.
It will play a song in a loop.
'''
payload = {}
payload['app'] = 'piphone'
payload['api_key'] = 'piphone:passpiphone'
payload['mode'] = 'holding'
try:
r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh', data=payload)
r.raise_for_status()
payload['mohClass'] = 'piphone'
r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/moh', data=payload)
r.raise_for_status()
except Exception as e:
logging.error("Cannot start MOH Bridges")
logging.exception(e)
raise e
ws.close()
loop.close()
threads.shutdown(wait=False)
def run(self, handler):
"""
We're starting the REST application, and the launch applications
"""
self.running = True
self.server = make_server('127.0.0.1', 8080, handler, **self.options)
self.threads.submit(self.server.serve_forever)
self.loop.run_until_complete(self.__listen())
def stop(self):
self.running = False
self.loop.close()
self.server.shutdown()
self.threads.shutdown(wait=False)
@app.get('/calls/<callid>')
@app.get('/calls/')
@authenticated
@connected
def calls(db, callid=None):
"""
Return the list of calls associated to the connected user.
The call has a status, caller, callee and history (status change+timestamp)
"""
logging.debug("GET {}".format(request.fullpath))
bottle_logger.debug("GET {}".format(request.fullpath))
if callid == None:
try:
results = db.execute('SELECT callid FROM calls WHERE owner = ?;', (request.params['api'],))
calls = []
for row in results.fetchall():
call = Call.load(row[0], db)
logging.debug("Call fetched: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
bottle_logger.debug("Call fetched: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
calls.append(call)
head = {'call': request.fullpath, 'user': request.params['api'], 'hits': len(calls)}
return {'head': head, 'data': calls}
except Exception as e:
logging.exception(e)
bottle_logger.exception(e)
abort(500, "Exception")
# We first need to check if we can access the callid we asked for
try:
results = db.execute('SELECT callid FROM calls WHERE owner = ? AND callid = ? ;'
, (request.params['api'], callid,))
rows = results.fetchall()
logging.debug("Found {} results: {}".format(len(rows), rows))
bottle_logger.debug("Found {} results: {}".format(len(rows), rows))
assert len(rows) == 1
call = Call.load(callid, db)
head = {'call': call.url(), 'user': request.params['api'], 'hits': 1}
return {'head': head, 'data': call}
except AssertionError as e:
logging.debug("Not exactly one results found, this is an issue")
logging.error("Unauthorized access to call {} from user {}".format(callid, request.params['api']))
bottle_logger.debug("Not exactly one results found, this is an issue")
bottle_logger.error("Unauthorized access to call {} from user {}".format(callid, request.params['api']))
abort(403, "You do not have the authorization to get this call")
except Exception as e:
logging.debug("Exception catched: {}".format(e,))
logging.error("Call not found {}".format(callid,))
bottle_logger.debug("Exception catched: {}".format(e,))
bottle_logger.error("Call not found {}".format(callid,))
abort(404, "This call does not exist")
@app.post('/calls/')
@app.post('/calls/<callid>')
@authenticated
@connected
def originate(db, callid=None):
logging.debug("POST {}".format(request.fullpath))
bottle_logger.debug("POST {}".format(request.fullpath))
try:
if callid is not None:
call = Call(request.params['caller'], request.params['callee'], request.params['api'], callid=request.params['callid'], db=db)
else:
call = Call(request.params['caller'], request.params['callee'], request.params['api'], db=db)
logging.debug("Originate a call: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
bottle_logger.debug("Originate a call: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
head = {'call': call.url()
, 'user': request.params['api']
, 'hits': 1}
return {'header': head, 'data': call}
except Exception as e:
logging.debug("Missing params : {}".format([p for p in request.params],))
logging.exception(e)
bottle_logger.debug("Missing params : {}".format([p for p in request.params],))
bottle_logger.exception(e)
abort(400, "Missing or incorrect fields, the call cannot be processed")
if __name__ == '__main__':
app.install(SQLitePlugin(dbfile='call.db'))
app.install(JSONPlugin(json_dumps=lambda s: json.dumps(s, cls=PiphoneJSONEncoder)))
server=Server()
global config
arg_parser = argparse.ArgumentParser(description='Manage the SIP Backend for the piphone')
arg_parser.add_argument('-c', '--config', help="Config file")
arg_parser.parse_args()
config = ConfigParser(arg_parser.config)
try:
app.run(server=server)
except Exception as e:
logging.exception(e)
server.stop()
raise e
running = True
threads.submit(app.run)
loop.run_until_complete(listen())
except:
stop()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment