app.py 18.2 KB
Newer Older
okhin's avatar
okhin committed
1
2
3
#!/usr/bin/env python

import sqlite3
okhin's avatar
okhin committed
4
5
6
import datetime
import uuid
import json
7
8
9
import logging
import concurrent.futures
import asyncio
10
import re
11
12
import configparser
import argparse
okhin's avatar
okhin committed
13
from operator import itemgetter
okhin's avatar
okhin committed
14
15

import jwt
16
17
import websockets
import requests
18
19
20
21
22
23
24
25
26
27
28
from bottle import request, abort, Bottle, JSONPlugin
from bottle_sqlite import SQLitePlugin

config = ConfigParser()
application = app = Bottle(autojson=False)
app.install(SQLitePlugin(dbfile='call.db'))
app.install(JSONPlugin(json_dumps=lambda s: json.dumps(s, cls=PiphoneJSONEncoder)))

threads = concurrent.futures.ThreadPoolExecutor()
loop = asyncio.get_event_loop()
db = sqlite3.connect('call.db')
okhin's avatar
okhin committed
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
running = False
ws = None
debug = True

# Loggers
handler = logging.FileHandler('app.log')
phone_logger = logging.getLogger('piphone')
phone_logger.addHandler(handler)
phone_logger.setLevel(logging.DEBUG)
ws_logger = logging.getLogger('asterisk')
ws_logger.addHandler(handler)
ws_logger.setLevel(logging.DEBUG)
bottle_logger = logging.getLogger('bottle')
bottle_logger.addHandler(handler)
bottle_logger.setLevel(logging.DEBUG)
45
46
47
48
49
50
51
52
53
54
55
56
57
58

class PiphoneJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        """
        We need to implement this to be able to JSONEncode
        """
        if isinstance(obj, Call):
            return {  'caller': obj.caller
                , 'callee': obj.callee
                , 'callid': obj.id
                , 'url': obj.url()
                , 'history': obj.history
                , 'owner': obj.owner }
        else:
59
            return json.JSONEncoder.default(self, obj)
okhin's avatar
okhin committed
60

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
def authenticated(f):
    '''
    We need a decorator to check if our query is authenticated.
    We will store an API key and SECRET in ur database, the client
    needs to have both of them.
    He must then send us a JWT token with an API claim in the payload.
    The JWT token must be encoded and signed with the SECRET. If the
    token is bad, we return a 403.
    '''
    def wrapped(db, *args, **kwargs):
        # Let's get the JWT token. It should be a params (from get or post or whatev')
        bottle_logger.debug("Authentication: {}".format([':'.join([key, request.params[key]]) for key in request.params],))
        if 'token' not in request.params:
            bottle_logger.error("No token found in the params")
            abort(401, "No token found in the query")
        # We want the api id in the params to.
        if 'api' not in request.params:
            bottle_logger.error("No api id found in the params")
            abort(401, "No api id found in the params")
        # Now, let's get the token on our side
        try:
            results = db.execute('SELECT token FROM users WHERE api = ?', (request.params['api'],)).fetchall()
            assert len(results) == 1
            token = results[0][0]
            auth_token = jwt.decode(request.params['token'], token)
            assert auth_token['api'] == request.params['api']
            for key in auth_token:
                request.params[key] = auth_token[key]
        except (jwt.exceptions.InvalidTokenError, AssertionError) as e:
            bottle_logger.error("Access refused")
            bottle_logger.exception(e)
            abort(403, e)
        except Exception as e:
            abort(500, e)
        return f(db, *args, **kwargs)
    return wrapped

def connected(f):
    '''
    This is a decorator used to check if we are connected to a websocket or not. If we're not
    We're returning 500 error.
    '''
    def wrapped(db, *args, **kwargs):
        if isinstance(ws, websockets.client.WebSocketClientProtocol):
            if ws.open == True:
                return f(db, *args, **kwargs)
        ws_logger.error("Websocket connection is closed")
        abort(500, "Websocket isn't running")
    return wrapped

okhin's avatar
okhin committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def sanitize_phonenumber(number):
    """
    This function is used to sanitize a phone number.
    If it starts with a +, it will be removed and replaced by 00.
    Any chars who do notbelong to [0-9] will be stripped.
    If the number doesn't starts with 00, a TypeError will be raised
    """
    if number[0] == '+':
        number = '00' + number[1:]
    number = ''.join([c for c in number if c in '0123456789'])
    if not number.startswith('00'):
        raise TypeError('{} is not a valid international number, it should start with 00')
    return number

125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
async def hold_bridge():
    '''
    We will create a bridge that will be used to park caller waiting for callee to join.
    It will play a song in a loop.
    '''
    payload = {}
    payload['app'] = 'piphone'
    payload['api_key'] = 'piphone:passpiphone'
    payload['mode'] = 'holding'
    try:
        r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh', data=payload)
        r.raise_for_status()
        payload['mohClass'] = 'piphone'
        r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/moh', data=payload)
        r.raise_for_status()
    except Exception as e:
        phone_logger.error("Cannot start 'on hold' bridge")
        phone_logger.exception(e)
        raise e

async def listen():
    '''
    Start listening on the websocket
    '''
    global running
    global ws
    ws = await websockets.connect('ws://185.34.33.12:8088/ari/events?app=piphone&api_key=piphone:passpiphone')
    ws_logger.debug('Websocket connected: {}'.format(type(ws)))
    await hold_bridges()
    while running == True:
        try:
            event = await ws.recv()
            # Let's call the applications function
            await dispatch(json.loads(event))
        except websockets.exceptions.ConnectionClosed as e:
            ws_logger.warning("Connexion closed")
            ws_logger.exception(e)
            running = False
            ws.close()
            return
        except Exception as e:
            ws_logger.exception(e)
            continue
    ws.close()

async def dispatch(self, event):
    """
    Let's work on our events. Parse them and do request on the ARI API. Event is
    a dict loaded from JSON.
    """
    ws_logger.debug('Event received: {}'.format(event,))
    # Let's get the call ID, the call id isthe channel id minus the last -part.
    if 'channel' not in event:
        return
    call_id = re.sub('-\d+$', '', event['channel']['id'])
    call = Call.load(call_id, self.db)
    call.event_handler(event)

183
class Call(object):
okhin's avatar
okhin committed
184
185
186
    """
    This Class is used to manage operatiosn on a call, to print it and dump it.
    """
187
    history = []
188
    actions = {'Created': 'call_caller'
189
190
        , 'ChannelStateChange': 'change'
        , 'ChannelDtmfReceived': 'dtmf'}
okhin's avatar
okhin committed
191

192
    def __init__(self, caller, callee, owner, callid=None, db=None):
okhin's avatar
okhin committed
193
194
195
196
        try:
            self.caller = caller
            self.callee = callee
            self.owner = owner
197
            if callid == None:
198
199
                self.id = str(uuid.uuid4())
                self.db = db
200
                self.event_handler({'type': 'Created', 'timestamp': datetime.datetime.now().isoformat(), 'channel': {'id': 'Init'}})
201
202
            else:
                self.id = callid
okhin's avatar
okhin committed
203
        except Exception as e:
204
            phone_logger.exception(e)
okhin's avatar
okhin committed
205
206
207
208
209
            raise e

    def url(self):
        return ''.join(['/calls/', self.id])

okhin's avatar
okhin committed
210
211
212
213
    def state(self):
        sort = sorted(self.history, reverse=True, key=itemgetter(1))
        return sort[0][0]

okhin's avatar
okhin committed
214
215
    @classmethod
    def load(cls, callid, db):
216
        phone_logger.debug("Loading call {}".format(callid,))
okhin's avatar
okhin committed
217
        try:
218
            results = db.execute('SELECT caller, callee, owner, callid, history FROM calls WHERE callid = ?;', (callid,))
okhin's avatar
okhin committed
219
            result = results.fetchone()
220
221
            assert len(result) == 5
            object = cls(result[0], result[1], result[2], result[3])
okhin's avatar
okhin committed
222
            object.history = json.loads(result[4])
223
            object.db = db
okhin's avatar
okhin committed
224
            return object
225
        except Exception as e:
226
            phone_logger.exception(e)
227
            raise e
okhin's avatar
okhin committed
228

229
    def update(self, new_state):
230
        '''
231
        Let's update the state of the call. new_state is a tuple in the form (newstate, timestamp,)
232
        '''
233
        phone_logger.debug("Got a new state: {}".format(new_state,))
234
235
        self.history.append(new_state)
        self.save()
236
237
238
239
240
241

    def event_handler(self, event):
        '''
        There's a new event related to our call
        '''
        state = event['type']
242
        if state in self.actions:
243
            getattr(self, self.actions[state])(event=event)
244

okhin's avatar
okhin committed
245
246
247
248
249
    def dtmf(self, event):
        '''
        We received a DTMF sequence
        '''
        try:
250
            assert self.state().startswith('Up')
okhin's avatar
okhin committed
251
            # The only thing we want to do is to call the callee if we press 1
252
253
254
255
256
257
258
259
260
            if event['digit'] != '1':
                return
            # Now, we're connectig the other side
            # But first we should stop the playback
            results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,))
            login_ari, token_ari = results.fetchone()
            payload = {}
            payload['app'] = 'piphone'
            payload['api_key'] = ':'.join([login_ari, token_ari])
261
            phone_logger.debug('Stopping the playback currently running'.format(payload,))
262
263
            # We're stoping the playback, with the same ID as the channel, to keep track of it
            r = requests.delete('http://185.34.33.12:8088/ari/playbacks/{0}'.format(event['channel']['id']), data=payload)
okhin's avatar
okhin committed
264
265
            # Now we're moving the channel to the MOH Bridge
            payload['channel'] = event['channel']['id']
266
            phone_logger.debug('Moving call {} to the garage'.format(payload['channel']))
okhin's avatar
okhin committed
267
            r = requests.post('http://185.34.33.12:8088/ari/bridges/piphone-moh/addChannel', data=payload)
268
            # Next we need to originate a call to the other side
269
            phone_logger.info('Will now connect {} to {}'.format(self.caller, self.callee,))
270
            payload['endpoint'] = 'SIP/' + sanitize_phonenumber(self.callee) + '@forfait-kwaoo'
271
            phone_logger.debug('Preparing to send a request to the ARI with payload {}'.format(payload,))
272
            r = requests.post('http://185.34.33.12:8088/ari/channels/{}-{}'.format(self.id, sanitize_phonenumber(self.callee),), data=payload)
okhin's avatar
okhin committed
273
        except AssertionError as e:
274
            logging.error("Received a DTMF sequence out le being in  a '{}' state, ignoring: {}".format(self.state(), event['digit']))
okhin's avatar
okhin committed
275

276
277
278
279
    def change(self, event):
        '''
        Let's change the state of the call
        '''
280
        self.update((':'.join([event['channel']['state'], event['channel']['id'].split('-')[-1]]), event['timestamp'],))
281
        phone_logger.info("New state for call {}: {}".format(event['channel']['id'], event['channel']['state']))
okhin's avatar
okhin committed
282
        # We now need to take action according to our new state
okhin's avatar
okhin committed
283
284
        results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,))
        login_ari, token_ari = results.fetchone()
okhin's avatar
okhin committed
285
        if event['channel']['state'] == 'Up':
286
287
            # Are we the caller orthe callee?
            if event['channel']['id'].endswith(self.callee):
okhin's avatar
okhin committed
288
289
290
291
292
                # Step 1 create a bridge
                payload = {}
                payload['app'] = 'piphone'
                payload['api_key'] = ':'.join([login_ari, token_ari])
                payload['type'] = 'mixing'
293
                phone_logger.debug("Creating a bridges to connect {} to {}".format(self.caller, self.callee,))
okhin's avatar
okhin committed
294
295
296
                r = requests.post('http://185.34.33.12:8088/ari/bridges/{}'.format(self.id), data=payload)
                # Step 2, moving channels
                payload['channel'] = ",".join([self.id + '-' + self.caller, self.id + '-' + self.callee])
297
                phone_logger.debug("Moving channels to the created bridge: {}".format(payload['channel']))
okhin's avatar
okhin committed
298
                r = requests.post('http://185.34.33.12:8088/ari/bridges/{}/addChannel'.format(self.id), data=payload)
299
                phone_logger.info("Call now fully connected: {} <-> {}".format(self.caller, self.callee))
300
                return
okhin's avatar
okhin committed
301
302
303
304
305
306
307
308
309
            # Call is being picked up, we want to play a song
            payload = {}
            try:
                payload['app'] = 'piphone'
                payload['api_key'] = ':'.join([login_ari, token_ari])
                payload['media'] = 'sound:mario'
                payload['lang'] = 'en_US'
                logging.debug('Preparing to send a request to the ARI with payload {}'.format(payload,))
                # We're starting a playback, with the same ID as the channel, to keep track of it
310
                r = requests.post('http://185.34.33.12:8088/ari/channels/{0}/play/{0}'.format(event['channel']['id']), data=payload)
okhin's avatar
okhin committed
311
312
                logging.debug("Now playing a sound on channel {}".format(self.id))
            except Exception as e:
313
                logging.exception(e)
okhin's avatar
okhin committed
314
                raise e
315
316

    def call_caller(self, event):
317
        '''
okhin's avatar
okhin committed
318
319
        Let's call the caller. It's a simple originate. We will also check if the MoH bridge is ready,
        because it will be used to store people in it waiting for the callee to pick up the phone.
320

okhin's avatar
okhin committed
321
        The bridge needed to connect the calls together will be created later.
322
        '''
323
324
325
326
327
328
        self.update((':'.join([event['type'], event['channel']['id'].split('-')[-1]]), event['timestamp'],))
        results = self.db.execute('SELECT login_ari, pass_ari FROM users WHERE api = ?', (self.owner,))
        login_ari, token_ari = results.fetchone()
        payload = {}
        payload['app'] = 'piphone'
        payload['api_key'] = ':'.join([login_ari, token_ari])
okhin's avatar
okhin committed
329
        # We want to check if the moh bridge is opened
330
        try:
okhin's avatar
okhin committed
331
332
333
334
335
            r = requests.get('http://185.34.33.12:8088/ari/bridges/{}-moh'.format(payload['app']), data=payload)
            r.raise_for_status()
        except HTTPError as e:
            # The bridge dos not exist
            logging.error("The Hold bridges does not exists")
336
337
338
339
340
341
            raise e

        # Now, let's create the channel
        payload = {}
        payload['app'] = 'piphone'
        payload['api_key'] = ':'.join([login_ari, token_ari])
342
343
344
        try:
            payload['endpoint'] = 'SIP/' + sanitize_phonenumber(self.caller) + '@forfait-kwaoo'
            logging.debug('Preparing to send a request to the ARI with payload {}'.format(payload,))
345
            r = requests.post('http://185.34.33.12:8088/ari/channels/{}-{}'.format(self.id, sanitize_phonenumber(self.caller),), data=payload)
346
        except Exception as e:
347
348
            logging.exception(e)
            raise e
349
350

    def save(self):
351
352
353
        '''
        Save the Call to database.
        '''
354
        phone_logger.debug("Saving call {}: {}".format(self.id, json.dumps(self, cls=PiphoneJSONEncoder)))
355
356
357
358
359
360
        try:
            self.db.execute('''INSERT OR REPLACE INTO calls (caller, callee, owner, callid, history)
                VALUES (?, ?, ?, ?, ?) '''
                , (self.caller, self.callee, self.owner, self.id, json.dumps(self.history)))
            self.db.commit()
        except Exception as e:
361
            phone_logger.exception(e)
362
            raise e
363

364
365
def stop():
    global running
366
    running = False
367
368
369
    ws.close()
    loop.close()
    threads.shutdown(wait=False)
okhin's avatar
okhin committed
370

371
@app.get('/calls/<callid>')
372
@app.get('/calls/')
okhin's avatar
okhin committed
373
@authenticated
374
@connected
375
def calls(db, callid=None):
okhin's avatar
okhin committed
376
377
378
379
    """
    Return the list of calls associated to the connected user.
    The call has a status, caller, callee and history (status change+timestamp)
    """
380
    bottle_logger.debug("GET {}".format(request.fullpath))
381
382
383
384
385
386
    if callid == None:
        try:
            results = db.execute('SELECT callid FROM calls WHERE owner = ?;', (request.params['api'],))
            calls = []
            for row in results.fetchall():
                call = Call.load(row[0], db)
387
                bottle_logger.debug("Call fetched: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
388
389
390
391
                calls.append(call)
            head = {'call': request.fullpath, 'user': request.params['api'], 'hits': len(calls)}
            return {'head': head, 'data': calls}
        except Exception as e:
392
            bottle_logger.exception(e)
393
394
395
396
397
398
            abort(500, "Exception")
    # We first need to check if we can access the callid we asked for
    try:
        results = db.execute('SELECT callid FROM calls WHERE owner = ? AND callid = ? ;'
                , (request.params['api'], callid,))
        rows = results.fetchall()
399
        bottle_logger.debug("Found {} results: {}".format(len(rows), rows))
400
401
402
403
404
        assert len(rows) == 1
        call = Call.load(callid, db)
        head = {'call': call.url(), 'user': request.params['api'], 'hits': 1}
        return {'head': head, 'data': call}
    except AssertionError as e:
405
406
        bottle_logger.debug("Not exactly one results found, this is an issue")
        bottle_logger.error("Unauthorized access to call {} from user {}".format(callid, request.params['api']))
407
408
        abort(403, "You do not have the authorization to get this call")
    except Exception as e:
409
410
        bottle_logger.debug("Exception catched: {}".format(e,))
        bottle_logger.error("Call not found {}".format(callid,))
411
412
413
414
        abort(404, "This call does not exist")

@app.post('/calls/')
@app.post('/calls/<callid>')
415
@authenticated
416
@connected
417
def originate(db, callid=None):
418
    bottle_logger.debug("POST {}".format(request.fullpath))
419
420
421
422
423
    try:
        if callid is not None:
            call = Call(request.params['caller'], request.params['callee'], request.params['api'], callid=request.params['callid'], db=db)
        else:
            call = Call(request.params['caller'], request.params['callee'], request.params['api'], db=db)
424
        bottle_logger.debug("Originate a call: {}".format(json.dumps(call, cls=PiphoneJSONEncoder)))
425
        head = {'call': call.url()
426
427
            , 'user': request.params['api']
            , 'hits': 1}
428
429
        return {'header': head, 'data': call}
    except Exception as e:
430
431
        bottle_logger.debug("Missing params : {}".format([p for p in request.params],))
        bottle_logger.exception(e)
432
        abort(400, "Missing or incorrect fields, the call cannot be processed")
433
434

if __name__ == '__main__':
435
436
437
438
439
    global config
    arg_parser = argparse.ArgumentParser(description='Manage the SIP Backend for the piphone')
    arg_parser.add_argument('-c', '--config', help="Config file")
    arg_parser.parse_args()
    config = ConfigParser(arg_parser.config)
440
    try:
441
442
443
444
445
        running = True
        threads.submit(app.run)
        loop.run_until_complete(listen())
    except:
        stop()