import datetime import email import hashlib import json import logging import os import pytz import requests import smtplib import ssl import time import yaml from mastodon import Mastodon # On paramètre le niveau de verbosité des logs logging.basicConfig(level=os.environ.get('LOG_LEVEL', 'WARNING').upper()) logger = logging.getLogger(__name__) class Attrap_bot: class Email_sender: port = 587 ssl = False starttls = True def __init__(self, hostname, email_from, email_to): self.hostname = hostname self.email_from = email_from self.email_to = email_to.split(',') def set_username(self, username): self.username = username def set_password(self, password): self.password = password def set_port(self, port): if not isinstance(port, int): self.port = int(port) else: self.port = port def set_ssl(self, ssl): self.ssl = ssl if ssl is True: self.starttls = False def set_starttls(self, starttls): self.starttls = starttls if starttls is True: self.ssl = False def send(self, email_subject, email_message): message = email.message.EmailMessage() message['Subject'] = email_subject message['From'] = self.email_from message['Date'] = email.utils.formatdate() message.set_content(email_message) if self.ssl: smtp = smtplib.SMTP_SSL(self.hostname, self.port, context=ssl.create_default_context()) elif self.starttls: smtp = smtplib.SMTP(self.hostname, self.port) smtp.starttls(context=ssl.create_default_context()) else: smtp = smtplib.SMTP(self.hostname, self.port) if self.username: smtp.login(self.username, self.password) for address in self.email_to: del message['To'] message['To'] = address del message['Message-ID'] message['Message-ID'] = email.utils.make_msgid(domain=self.email_from.split('@')[-1]) smtp.send_message(message) time.sleep(3) logger.debug('Attente 3 secondes après envoi de l\'email...') smtp.quit() class Mastodon_sender: mastodon = None def __init__(self, instance, access_token, toot): self.instance = instance self.access_token = access_token if not toot: self.toot = "{name}\n\n{url}" else: self.toot = toot def send(self, message): if not self.mastodon: self.mastodon = Mastodon( api_base_url=self.instance, access_token=self.access_token ) self.mastodon.toot(message) time.sleep(3) logger.debug('Attente 3 secondes après envoi sur Mastodon...') def __init__(self, config): # On surcharge la configuration par défaut avec la configuration donnée config_default = yaml.load(open('config.default.yml', 'r'), Loader=yaml.FullLoader) self.config = config_default | config logger.info('Démarrage de Attrap_bot') logger.info(f"Source des données : {self.config['data_source']}") def analyze(self): if self.config.get('queries'): # Liste des RAA à publier raa_to_publish_mastodon = {} raa_to_publish_email = {} for i in self.config['queries']: query = self.config['queries'][i] query_id = i search = query['search'] administration = "" if query.get('administration') and query['administration']: administration = query['administration'] if query.get('hashtag'): hashtag = query['hashtag'] else: hashtag = query_id # Est-ce qu'il faudra envoyer le résultat sur Mastodon ? if query.get('mastodon'): send_to_mastodon = True else: send_to_mastodon = False # Est-ce qu'il faudra envoyer le résultat par mail ? if query.get('email'): send_email = True else: send_email = False logger.info(f'Démarrage de la recherche {query_id}') logger.info(f'Requête : {search}') logger.info(f'Administration : {administration}') # On fabrique l'URL de requête one_week_ago = datetime.datetime.today() - datetime.timedelta(days=7) request_url = self.config['data_source'] request_url = request_url.replace('{search}', search) request_url = request_url.replace('{administration}', administration) request_url = request_url.replace('{start_date}', one_week_ago.strftime('%Y-%m-%d')) # On ouvre le fichier de status status_file_path = f'{query_id}.latest.txt' if os.path.isfile(status_file_path): status = open(status_file_path, 'r').read().strip() last_raa_id = status.split('|')[0] last_raa_first_seen_on = datetime.datetime.fromtimestamp(float(status.split('|')[1]), pytz.utc) send_results = True else: logger.info('Requête lancée pour la première fois, les résultats ne seront pas envoyés') last_raa_id = '0000000000000000000000000000000000000000000000000000000000000000' last_raa_first_seen_on = datetime.datetime.now(pytz.utc) if self.config['send_on_new_queries']: send_results = True else: send_results = False # On interroge l'API d'Attrap api_result = requests.get(request_url, timeout=(10, 120)) response = json.loads(api_result.content) if api_result.status_code == 200: raa = response['elements'] else: logger.warning(f'ATTENTION ! L\'API a renvoyé un code {api_result.status_code}') raa = {} if len(raa) == 0: if os.path.isfile(status_file_path): os.remove(status_file_path) status_file = open(status_file_path, 'w') status = f'{last_raa_id}|{int(round(last_raa_first_seen_on.timestamp()))}' status_file.write(status) status_file.close() else: # Si le dernier RAA n'est pas celui connu, on analyse les résultats latest = raa[0] if latest['id'] != last_raa_id: email_message = "" # On récupère les derniers RAA jusqu'à ce qu'on trouve le dernier analysé raa_candidates = [] for result in raa: if result['id'] != last_raa_id: raa_candidates.append(result) else: break # Maintenant, on prend les derniers RAA à analyser, pour commencer par les plus anciens en premier for result in raa_candidates[::-1]: # On vérifie que la date de détection du RAA est plus récente (au cas où l'API n'ait pas renvoyé # tous les résultats, ce qui arrive pendant une mise à jour de la base de données) first_seen_on = datetime.datetime.fromisoformat(result['first_seen_on']) if first_seen_on > last_raa_first_seen_on: last_raa_id = result['id'] last_raa_first_seen_on = first_seen_on # On affiche le résultat dans la console if (self.config['console_output']): print(f"\033[92m{result['name']}\033[0m ({result['date']}) : {result['url']}") if send_results: # On ajoute le résultat à la liste des RAA à publier if send_to_mastodon: if not raa_to_publish_mastodon.get(query['mastodon']): raa_to_publish_mastodon[query['mastodon']] = {} # On regroupe les toots par RAA (un toot par RAA) if not raa_to_publish_mastodon[query['mastodon']].get(result['id']): raa_to_publish_mastodon[query['mastodon']][result['id']] = result # On indique le hashtag de la requête if not raa_to_publish_mastodon[query['mastodon']][result['id']].get('hashtags'): raa_to_publish_mastodon[query['mastodon']][result['id']]['hashtags'] = [] raa_to_publish_mastodon[query['mastodon']][result['id']]['hashtags'].append(hashtag) if send_email: if not raa_to_publish_email.get(query['email']): raa_to_publish_email[query['email']] = {} # On regroupe les mails par administration (un mail par administration) if not raa_to_publish_email[query['email']].get(result['administration']): raa_to_publish_email[query['email']][result['administration']] = {} if not raa_to_publish_email[query['email']][result['administration']].get(result['id']): raa_to_publish_email[query['email']][result['administration']][result['id']] = result # On indique le hashtag de la requête if not raa_to_publish_email[query['email']][result['administration']][result['id']].get('queries'): raa_to_publish_email[query['email']][result['administration']][result['id']]['queries'] = [] raa_to_publish_email[query['email']][result['administration']][result['id']]['queries'].append(query_id) if os.path.isfile(status_file_path): os.remove(status_file_path) status_file = open(status_file_path, 'w') status = f'{last_raa_id}|{int(round(last_raa_first_seen_on.timestamp()))}' status_file.write(status) status_file.close() # On attend quelques secondes avant de lancer la prochaine requête vers l'API time.sleep(5) # On envoie un toot par RAA if len(raa_to_publish_mastodon) > 0: logger.info('Envoi des toots') for sender_id in raa_to_publish_mastodon: sender = self.get_mastodon_sender(sender_id) if sender: for raa_id in raa_to_publish_mastodon[sender_id]: raa = raa_to_publish_mastodon[sender_id][raa_id] raa['hashtags'] = list(dict.fromkeys(raa['hashtags'])) # On supprime les doublons raa['hashtags_str'] = '' for hashtag in raa['hashtags']: if raa['hashtags_str'] == '': raa['hashtags_str'] = f'#{hashtag}' else: raa['hashtags_str'] = f"{raa['hashtags_str']} #{hashtag}" # On construit le texte du toot toot = sender.toot toot = toot.replace('{administration}', raa['administration']) toot = toot.replace('{administration_name}', raa['administration_name']) toot = toot.replace('{date}', raa['date']) toot = toot.replace('{id}', raa['id']) toot = toot.replace('{name}', raa['name']) toot = toot.replace('{url}', raa['url']) toot = toot.replace('{hashtags}', raa['hashtags_str']) toot = toot.replace('\\n', "\n") sender.send(toot) # On envoie un mail par administration if len(raa_to_publish_email) > 0: logger.info('Envoi des mails') for sender_id in raa_to_publish_email: sender = self.get_email_sender(sender_id) if sender: for administration_id in raa_to_publish_email[sender_id]: # On construit le début du mail (on prend le pr) message_header = f'Attrap : {administration_id}' queries = [] message_raa_list = 'RAA trouvés :' # On ajoute chaque RAA au contenu du mail for raa_id in raa_to_publish_email[sender_id][administration_id]: raa = raa_to_publish_email[sender_id][administration_id][raa_id] for query in raa['queries']: queries.append(query) # On renseigne la liste des requêtes qui ont retourné un résultat pour les afficher plus bas if not raa.get('queries_str'): raa['queries_str'] = f'{query}' else: raa['queries_str'] = f"{raa['queries_str']}, {query}" message_raa_list = f"{message_raa_list}\n - {raa['name']}\n URL : {raa['url']}\n Date : {raa['date']}\n Trouvé dans les requêtes : {raa['queries_str']}" # On construit la liste des requêtes message_queries_overview = 'Requêtes :' for query in queries: message_queries_overview = f"{message_queries_overview}\n - {query} : {self.config['queries'][query]['search']}" # On construit le message et on l'envoie message = f"{message_header}\n\n{message_queries_overview}\n\n{message_raa_list}" subject = f'[Attrap][{administration_id}] Nouveaux éléments trouvés' sender.send(subject, message) def get_mastodon_sender(self, config_id): if self.config.get('mastodon'): if self.config['mastodon'].get(config_id) and self.config['mastodon'][config_id]['instance'] and self.config['mastodon'][config_id]['access_token']: if self.config['mastodon'][config_id].get('toot'): toot = self.config['mastodon'][config_id]['toot'] else: toot = None return Attrap_bot.Mastodon_sender(self.config['mastodon'][config_id]['instance'], self.config['mastodon'][config_id]['access_token'], toot) logger.warning(f"La configuration Mastodon est invalide (id: {config_id})") return None def get_email_sender(self, config_id): if self.config.get('email'): if self.config['email'].get(config_id) and self.config['email'][config_id]['hostname'] and self.config['email'][config_id]['from'] and self.config['email'][config_id]['to']: email_sender = Attrap_bot.Email_sender(self.config['email'][config_id]['hostname'], self.config['email'][config_id]['from'], self.config['email'][config_id]['to']) if self.config['email'][config_id].get('username'): email_sender.set_username(self.config['email'][config_id]['username']) if self.config['email'][config_id].get('password'): email_sender.set_password(self.config['email'][config_id]['password']) if self.config['email'][config_id].get('port'): email_sender.set_port(self.config['email'][config_id]['port']) if self.config['email'][config_id].get('ssl'): email_sender.set_ssl(self.config['email'][config_id]['ssl']) if self.config['email'][config_id].get('starttls'): email_sender.set_starttls(self.config['email'][config_id]['starttls']) return email_sender logger.warning(f"La configuration Email est invalide (id: {config_id})") if self.config['email'].get(config_id): logger.debug(f"Configuration : {self.config['email'][config_id]}") return None