#!/usr/bin/env python import sqlite3 import datetime import uuid import json import logging import concurrent.futures import asyncio import re import configparser import argparse from operator import itemgetter import jwt import websockets import requests from bottle import request, abort, Bottle, JSONPlugin from bottle_sqlite import SQLitePlugin config = ConfigParser() application = app = Bottle(autojson=False) app.install(SQLitePlugin(dbfile='call.db')) app.install(JSONPlugin(json_dumps=lambda s: json.dumps(s, cls=PiphoneJSONEncoder))) threads = concurrent.futures.ThreadPoolExecutor() loop = asyncio.get_event_loop() db = sqlite3.connect('call.db') running = False ws = None debug = True # Loggers handler = logging.FileHandler('app.log') phone_logger = logging.getLogger('piphone') phone_logger.addHandler(handler) phone_logger.setLevel(logging.DEBUG) ws_logger = logging.getLogger('asterisk') ws_logger.addHandler(handler) ws_logger.setLevel(logging.DEBUG) bottle_logger = logging.getLogger('bottle') bottle_logger.addHandler(handler) bottle_logger.setLevel(logging.DEBUG) class PiphoneJSONEncoder(json.JSONEncoder): def default(self, obj): """ We need to implement this to be able to JSONEncode """ if isinstance(obj, Call): return { 'caller': obj.caller , 'callee': obj.callee , 'callid': obj.id , 'url': obj.url() , 'history': obj.history , 'owner': obj.owner } else: return json.JSONEncoder.default(self, obj) def authenticated(f): ''' We need a decorator to check if our query is authenticated. We will store an API key and SECRET in ur database, the client needs to have both of them. He must then send us a JWT token with an API claim in the payload. The JWT token must be encoded and signed with the SECRET. If the token is bad, we return a 403. ''' def wrapped(db, *args, **kwargs): # Let's get the JWT token. It should be a params (from get or post or whatev') bottle_logger.debug("Authentication: {}".format([':'.join([key, request.params[key]]) for key in request.params],)) if 'token' not in request.params: bottle_logger.error("No token found in the params") abort(401, "No token found in the query") # We want the api id in the params to. if 'api' not in request.params: bottle_logger.error("No api id found in the params") abort(401, "No api id found in the params") # Now, let's get the token on our side try: results = db.execute('SELECT token FROM users WHERE api = ?', (request.params['api'],)).fetchall() assert len(results) == 1 token = results[0][0] auth_token = jwt.decode(request.params['token'], token) assert auth_token['api'] == request.params['api'] for key in auth_token: request.params[key] = auth_token[key] except (jwt.exceptions.InvalidTokenError, AssertionError) as e: bottle_logger.error("Access refused") bottle_logger.exception(e) abort(403, e) except Exception as e: abort(500, e) return f(db, *args, **kwargs) return wrapped def connected(f): ''' This is a decorator used to check if we are connected to a websocket or not. If we're not We're returning 500 error. ''' def wrapped(db, *args, **kwargs): if isinstance(ws, websockets.client.WebSocketClientProtocol): if ws.open == True: return f(db, *args, **kwargs) ws_logger.error("Websocket connection is closed") abort(500, "Websocket isn't running") return wrapped def sanitize_phonenumber(number): """ This function is used to sanitize a phone number. If it starts with a +, it will be removed and replaced by 00. Any chars who do notbelong to [0-9] will be stripped. If the number doesn't starts with 00, a TypeError will be raised """ if number[0] == '+': number = '00' + number[1:] number = ''.join([c for c in number if c in '0123456789']) if not number.startswith('00'): raise TypeError('{} is not a valid international number, it should start with 00') return number async def hold_bridge(): ''' We will create a bridge that will be used to park caller waiting for callee to join. It will play a song in a loop. ''' payload = {} payload['app'] = 'piphone' payload['api_key'] = 'piphone:passpiphone' payload['mode'] = 'holding' try: r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh', data=payload) r.raise_for_status() payload['mohClass'] = 'piphone' r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/moh', data=payload) r.raise_for_status() except Exception as e: phone_logger.error("Cannot start 'on hold' bridge") phone_logger.exception(e) raise e async def listen(): ''' Start listening on the websocket ''' global running global ws ws = await websockets.connect('ws://185.34.33.12:8088/ari/events?app=piphone&api_key=piphone:passpiphone') ws_logger.debug('Websocket connected: {}'.format(type(ws))) await hold_bridges() while running == True: try: event = await ws.recv() # Let's call the applications function await dispatch(json.loads(event)) except websockets.exceptions.ConnectionClosed as e: ws_logger.warning("Connexion closed") ws_logger.exception(e) running = False ws.close() return except Exception as e: ws_logger.exception(e) continue ws.close() async def dispatch(self, event): """ Let's work on our events. Parse them and do request on the ARI API. Event is a dict loaded from JSON. """ ws_logger.debug('Event received: {}'.format(event,)) # Let's get the call ID, the call id isthe channel id minus the last -part. if 'channel' not in event: return call_id = re.sub('-\d+$', '', event['channel']['id']) call = Call.load(call_id, self.db) call.event_handler(event) class Call(object): """ This Class is used to manage operatiosn on a call, to print it and dump it. """ history = [] actions = {'Created': 'call_caller' , 'ChannelStateChange': 'change' , 'ChannelDtmfReceived': 'dtmf'} def __init__(self, caller, callee, owner, callid=None, db=None): try: self.caller = caller self.callee = callee self.owner = owner if callid == None: self.id = str(uuid.uuid4()) self.db = db self.event_handler({'type': 'Created', 'timestamp': datetime.datetime.now().isoformat(), 'channel': {'id': 'Init'}}) else: self.id = callid except Exception as e: phone_logger.exception(e) raise e def url(self): return ''.join(['/calls/', self.id]) def state(self): sort = sorted(self.history, reverse=True, key=itemgetter(1)) return sort[0][0] @classmethod def load(cls, callid, db): phone_logger.debug("Loading call {}".format(callid,)) try: results = db.execute('SELECT caller, callee, owner, callid, history FROM calls WHERE callid = ?;', (callid,)) result = results.fetchone() assert len(result) == 5 object = cls(result[0], result[1], result[2], result[3]) object.history = json.loads(result[4]) object.db = db return object except Exception as e: phone_logger.exception(e) raise e def update(self, new_state): ''' Let's update the state of the call. new_state is a tuple in the form (newstate, timestamp,) ''' phone_logger.debug("Got a new state: {}".format(new_state,)) self.history.append(new_state) self.save() def event_handler(self, event): ''' There's a new event related to our call ''' state = event['type'] if state in self.actions: getattr(self, self.actions[state])(event=event) def dtmf(self, event): ''' We received a DTMF sequence ''' try: assert self.state().startswith('Up') # The only thing we want to do is to call the callee if we press 1 if event['digit'] != '1': return # Now, we're connectig the other side # But first we should stop the playback results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,)) login_ari, token_ari = results.fetchone() payload = {} payload['app'] = 'piphone' payload['api_key'] = ':'.join([login_ari, token_ari]) phone_logger.debug('Stopping the playback currently running'.format(payload,)) # We're stoping the playback, with the same ID as the channel, to keep track of it r = requests.delete('http://185.34.33.12:8088/ari/playbacks/{0}'.format(event['channel']['id']), data=payload) # Now we're moving the channel to the MOH Bridge payload['channel'] = event['channel']['id'] phone_logger.debug('Moving call {} to the garage'.format(payload['channel'])) r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/addChannel', data=payload) # Next we need to originate a call to the other side phone_logger.info('Will now connect {} to {}'.format(self.caller, self.callee,)) payload['endpoint'] = 'SIP/' + sanitize_phonenumber(self.callee) + '@forfait-kwaoo' phone_logger.debug('Preparing to send a request to the ARI with payload {}'.format(payload,)) r = requests.post('http://185.34.33.12:8088/ari/channels/{}-{}'.format(self.id, sanitize_phonenumber(self.callee),), data=payload) except AssertionError as e: logging.error("Received a DTMF sequence out le being in a '{}' state, ignoring: {}".format(self.state(), event['digit'])) def change(self, event): ''' Let's change the state of the call ''' self.update((':'.join([event['channel']['state'], event['channel']['id'].split('-')[-1]]), event['timestamp'],)) phone_logger.info("New state for call {}: {}".format(event['channel']['id'], event['channel']['state'])) # We now need to take action according to our new state results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,)) login_ari, token_ari = results.fetchone() if event['channel']['state'] == 'Up': # Are we the caller orthe callee? if event['channel']['id'].endswith(self.callee): # Step 1 create a bridge payload = {} payload['app'] = 'piphone' payload['api_key'] = ':'.join([login_ari, token_ari]) payload['type'] = 'mixing' phone_logger.debug("Creating a bridges to connect {} to {}".format(self.caller, self.callee,)) r = requests.post('http://185.34.33.12:8088/ari/bridges/{}'.format(self.id), data=payload) # Step 2, moving channels payload['channel'] = ",".join([self.id + '-' + self.caller, self.id + '-' + self.callee]) phone_logger.debug("Moving channels to the created bridge: {}".format(payload['channel'])) r = requests.post('http://185.34.33.12:8088/ari/bridges/{}/addChannel'.format(self.id), data=payload) phone_logger.info("Call now fully connected: {} <-> {}".format(self.caller, self.callee)) return # Call is being picked up, we want to play a song payload = {} try: payload['app'] = 'piphone' payload['api_key'] = ':'.join([login_ari, token_ari]) payload['media'] = 'sound:mario' payload['lang'] = 'en_US' logging.debug('Preparing to send a request to the ARI with payload {}'.format(payload,)) # We're starting a playback, with the same ID as the channel, to keep track of it r = requests.post('http://185.34.33.12:8088/ari/channels/{0}/play/{0}'.format(event['channel']['id']), data=payload) logging.debug("Now playing a sound on channel {}".format(self.id)) except Exception as e: logging.exception(e) raise e def call_caller(self, event): ''' Let's call the caller. It's a simple originate. We will also check if the MoH bridge is ready, because it will be used to store people in it waiting for the callee to pick up the phone. The bridge needed to connect the calls together will be created later. ''' self.update((':'.join([event['type'], event['channel']['id'].split('-')[-1]]), event['timestamp'],)) results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,)) login_ari, token_ari = results.fetchone() payload = {} payload['app'] = 'piphone' payload['api_key'] = ':'.join([login_ari, token_ari]) # We want to check if the moh bridge is opened try: r = requests.get('http://185.34.33.12:8088/ari/bridges/{}-moh'.format(payload['app']), data=payload) r.raise_for_status() except HTTPError as e: # The bridge dos not exist logging.error("The Hold bridges does not exists") raise e # Now, let's create the channel payload = {} payload['app'] = 'piphone' payload['api_key'] = ':'.join([login_ari, token_ari]) try: payload['endpoint'] = 'SIP/' + sanitize_phonenumber(self.caller) + '@forfait-kwaoo' logging.debug('Preparing to send a request to the ARI with payload {}'.format(payload,)) r = requests.post('http://185.34.33.12:8088/ari/channels/{}-{}'.format(self.id, sanitize_phonenumber(self.caller),), data=payload) except Exception as e: logging.exception(e) raise e def save(self): ''' Save the Call to database. ''' phone_logger.debug("Saving call {}: {}".format(self.id, json.dumps(self, cls=PiphoneJSONEncoder))) try: self.db.execute('''INSERT OR REPLACE INTO calls (caller, callee, owner, callid, history) VALUES (?, ?, ?, ?, ?) ''' , (self.caller, self.callee, self.owner, self.id, json.dumps(self.history))) self.db.commit() except Exception as e: phone_logger.exception(e) raise e def stop(): global running running = False ws.close() loop.close() threads.shutdown(wait=False) @app.get('/calls/') @app.get('/calls/') @authenticated @connected def calls(db, callid=None): """ Return the list of calls associated to the connected user. The call has a status, caller, callee and history (status change+timestamp) """ bottle_logger.debug("GET {}".format(request.fullpath)) if callid == None: try: results = db.execute('SELECT callid FROM calls WHERE owner = ?;', (request.params['api'],)) calls = [] for row in results.fetchall(): call = Call.load(row[0], db) bottle_logger.debug("Call fetched: {}".format(json.dumps(call, cls=PiphoneJSONEncoder))) calls.append(call) head = {'call': request.fullpath, 'user': request.params['api'], 'hits': len(calls)} return {'head': head, 'data': calls} except Exception as e: bottle_logger.exception(e) abort(500, "Exception") # We first need to check if we can access the callid we asked for try: results = db.execute('SELECT callid FROM calls WHERE owner = ? AND callid = ? ;' , (request.params['api'], callid,)) rows = results.fetchall() bottle_logger.debug("Found {} results: {}".format(len(rows), rows)) assert len(rows) == 1 call = Call.load(callid, db) head = {'call': call.url(), 'user': request.params['api'], 'hits': 1} return {'head': head, 'data': call} except AssertionError as e: bottle_logger.debug("Not exactly one results found, this is an issue") bottle_logger.error("Unauthorized access to call {} from user {}".format(callid, request.params['api'])) abort(403, "You do not have the authorization to get this call") except Exception as e: bottle_logger.debug("Exception catched: {}".format(e,)) bottle_logger.error("Call not found {}".format(callid,)) abort(404, "This call does not exist") @app.post('/calls/') @app.post('/calls/') @authenticated @connected def originate(db, callid=None): bottle_logger.debug("POST {}".format(request.fullpath)) try: if callid is not None: call = Call(request.params['caller'], request.params['callee'], request.params['api'], callid=request.params['callid'], db=db) else: call = Call(request.params['caller'], request.params['callee'], request.params['api'], db=db) bottle_logger.debug("Originate a call: {}".format(json.dumps(call, cls=PiphoneJSONEncoder))) head = {'call': call.url() , 'user': request.params['api'] , 'hits': 1} return {'header': head, 'data': call} except Exception as e: bottle_logger.debug("Missing params : {}".format([p for p in request.params],)) bottle_logger.exception(e) abort(400, "Missing or incorrect fields, the call cannot be processed") if __name__ == '__main__': global config arg_parser = argparse.ArgumentParser(description='Manage the SIP Backend for the piphone') arg_parser.add_argument('-c', '--config', help="Config file") arg_parser.parse_args() config = ConfigParser(arg_parser.config) try: running = True threads.submit(app.run) loop.run_until_complete(listen()) except: stop()