Skip to content
Extraits de code Groupes Projets
Valider dd615ffc rédigé par Bastien Le Querrec's avatar Bastien Le Querrec
Parcourir les fichiers

Attrap_bot: lance l'analyse que si des requêtes sont configurées

parent 895c0cf8
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
......@@ -100,105 +100,106 @@ class Attrap_bot:
logger.info(f"Source des données : {self.config['data_source']}")
def analyze(self):
for i in self.config['queries']:
query = self.config['queries'][i]
query_id = i
search = query['search']
administration = query['administration']
if not administration:
administration = ""
logger.info(f'Démarrage de la recherche {query_id}')
logger.info(f'Requête : {search}')
logger.info(f'Administration : {administration}')
# On récupère de quoi envoyer par mail et sur Mastodon
smtp_sender = None
mastodon_sender = None
if query.get('email') and query['email'].get('smtp'):
logger.debug('Configuration SMTP trouvée')
smtp_sender = self.get_smtp_sender(query['email']['smtp'])
if query.get('mastodon') and query['mastodon'].get('instance'):
logger.debug('Configuration Mastodon trouvée')
mastodon_sender = self.get_mastodon_sender(query['mastodon']['instance'])
# On fabrique l'URL de requête
request_url = self.config['data_source'].replace('{search}', search).replace('{administration}', administration)
# On ouvre le fichier de status
status_file_path = f'{query_id}.latest.txt'
if os.path.isfile(status_file_path):
status = open(status_file_path, 'r').read().strip()
else:
status = None
# On interroge l'API d'Attrap
response = json.loads(requests.get(request_url, timeout=(10, 120)).content)
raa = response['elements']
# Si le dernier RAA n'est pas celui connu, on analyse les résultats
latest = raa[0]
if latest['id'] != status:
email_message = ""
# On récupère les derniers RAA jusqu'à ce qu'on trouve le dernier analysé
raa_candidates = []
for result in raa:
if result['id'] != status:
raa_candidates.append(result)
else:
break
# Maintenant, on prend les derniers RAA à analyser, pour commencer par les plus anciens en premier
for result in raa_candidates[::-1]:
status = result['id']
raa_name = result['name']
raa_date = result['date']
raa_administration = result['administration']
raa_administration_name = result['administration_name']
raa_url = result['url']
# On affiche le résultat dans la console
if (self.config['console_output']):
print(f'\033[92m{raa_name}\033[0m ({raa_date}) : {raa_url}')
# On publie sur Mastodon
if mastodon_sender:
toot = f'{raa_name} ({raa_date})\n\n{raa_url}'
if query['mastodon']['prefix'] and not query['mastodon']['prefix'] == '':
prefix = query['mastodon']['prefix']
prefix = prefix.replace('{id}', query_id)
prefix = prefix.replace('{administration}', raa_administration)
prefix = prefix.replace('{administration_name}', raa_administration_name)
toot = f"{prefix}\n\n{toot}"
if query['mastodon']['suffix'] and not query['mastodon']['suffix'] == '':
suffix = query['mastodon']['suffix']
suffix = suffix.replace('{id}', query_id)
suffix = suffix.replace('{administration}', raa_administration)
suffix = suffix.replace('{administration_name}', raa_administration_name)
toot = f"{toot}\n\n{suffix}"
mastodon_sender.send(toot)
# On complète le mail
if smtp_sender:
if email_message == "":
email_message = f"Attrap ({query_id})\nRequête : {search}"
email_message = f"{email_message}\n\n{raa_administration_name} :\n{raa_name} ({raa_date})\nURL : {raa_url}"
# On envoie le mail si le message est prêt
if not email_message == "":
smtp_sender.send(
query['email']['from'],
query['email']['to'],
f"[Attrap] [{query_id}] Nouveaux éléments trouvés",
email_message
)
if self.config.get('queries'):
for i in self.config['queries']:
query = self.config['queries'][i]
query_id = i
search = query['search']
administration = query['administration']
if not administration:
administration = ""
logger.info(f'Démarrage de la recherche {query_id}')
logger.info(f'Requête : {search}')
logger.info(f'Administration : {administration}')
# On récupère de quoi envoyer par mail et sur Mastodon
smtp_sender = None
mastodon_sender = None
if query.get('email') and query['email'].get('smtp'):
logger.debug('Configuration SMTP trouvée')
smtp_sender = self.get_smtp_sender(query['email']['smtp'])
if query.get('mastodon') and query['mastodon'].get('instance'):
logger.debug('Configuration Mastodon trouvée')
mastodon_sender = self.get_mastodon_sender(query['mastodon']['instance'])
# On fabrique l'URL de requête
request_url = self.config['data_source'].replace('{search}', search).replace('{administration}', administration)
# On ouvre le fichier de status
status_file_path = f'{query_id}.latest.txt'
if os.path.isfile(status_file_path):
os.remove(status_file_path)
status_file = open(status_file_path, 'w')
status_file.write(status)
status_file.close()
status = open(status_file_path, 'r').read().strip()
else:
status = None
# On interroge l'API d'Attrap
response = json.loads(requests.get(request_url, timeout=(10, 120)).content)
raa = response['elements']
# Si le dernier RAA n'est pas celui connu, on analyse les résultats
latest = raa[0]
if latest['id'] != status:
email_message = ""
# On récupère les derniers RAA jusqu'à ce qu'on trouve le dernier analysé
raa_candidates = []
for result in raa:
if result['id'] != status:
raa_candidates.append(result)
else:
break
# Maintenant, on prend les derniers RAA à analyser, pour commencer par les plus anciens en premier
for result in raa_candidates[::-1]:
status = result['id']
raa_name = result['name']
raa_date = result['date']
raa_administration = result['administration']
raa_administration_name = result['administration_name']
raa_url = result['url']
# On affiche le résultat dans la console
if (self.config['console_output']):
print(f'\033[92m{raa_name}\033[0m ({raa_date}) : {raa_url}')
# On publie sur Mastodon
if mastodon_sender:
toot = f'{raa_name} ({raa_date})\n\n{raa_url}'
if query['mastodon']['prefix'] and not query['mastodon']['prefix'] == '':
prefix = query['mastodon']['prefix']
prefix = prefix.replace('{id}', query_id)
prefix = prefix.replace('{administration}', raa_administration)
prefix = prefix.replace('{administration_name}', raa_administration_name)
toot = f"{prefix}\n\n{toot}"
if query['mastodon']['suffix'] and not query['mastodon']['suffix'] == '':
suffix = query['mastodon']['suffix']
suffix = suffix.replace('{id}', query_id)
suffix = suffix.replace('{administration}', raa_administration)
suffix = suffix.replace('{administration_name}', raa_administration_name)
toot = f"{toot}\n\n{suffix}"
mastodon_sender.send(toot)
# On complète le mail
if smtp_sender:
if email_message == "":
email_message = f"Attrap ({query_id})\nRequête : {search}"
email_message = f"{email_message}\n\n{raa_administration_name} :\n{raa_name} ({raa_date})\nURL : {raa_url}"
# On envoie le mail si le message est prêt
if not email_message == "":
smtp_sender.send(
query['email']['from'],
query['email']['to'],
f"[Attrap] [{query_id}] Nouveaux éléments trouvés",
email_message
)
if os.path.isfile(status_file_path):
os.remove(status_file_path)
status_file = open(status_file_path, 'w')
status_file.write(status)
status_file.close()
def get_mastodon_sender(self, config_id):
if self.config.get('mastodon'):
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter