import os import re import random import ssl import subprocess import shutil import string import logging import requests import time import datetime import json from urllib.parse import quote from selenium import webdriver from selenium.common.exceptions import TimeoutException from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions import pytz import dateparser import urllib3 from bs4 import BeautifulSoup from pyvirtualdisplay import Display from pypdf import PdfReader from pypdf import PdfWriter from pypdf.generic import NameObject, NumberObject from pypdf.errors import PdfStreamError from pypdf.errors import EmptyFileError import hashlib import smtplib import email from mastodon import Mastodon import ftfy logger = logging.getLogger(__name__) class Attrap: class RAA: """La classe représentant un Recueil des actes administratifs. La plupart du temps, il s'agit d'un PDF avec plusieurs arrêtés.""" url = "" date = None name = "" sha256 = "" pdf_creation_date = None pdf_modification_date = None def __init__(self, url, date, name): if not url == "": self.url = url if date is not None: self.date = Attrap.get_aware_datetime(date) if not name == "": self.name = name def get_sha256(self): """Calcule et met en cache le hash sha256 de l'URL du PDF, pour servir d'identifiant unique.""" if (self.sha256 == ""): self.sha256 = hashlib.sha256(self.url.encode('utf-8')).hexdigest() return self.sha256 def get_pdf_dates(self, data_dir): """Extrait les dates des PDF pour les ajouter à l'objet du RAA.""" raa_data_dir = f'{data_dir}/raa/' reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.pdf') pdf_metadata = reader.metadata if pdf_metadata: if pdf_metadata.creation_date: self.pdf_creation_date = Attrap.get_aware_datetime(pdf_metadata.creation_date) if self.date is None: self.date = Attrap.get_aware_datetime(pdf_metadata.creation_date) if pdf_metadata.modification_date: self.pdf_modification_date = Attrap.get_aware_datetime(pdf_metadata.modification_date) if self.date is None: self.date = Attrap.get_aware_datetime(pdf_metadata.modification_date) def extract_content(self, data_dir): """Extrait le contenu du PDF OCRisé pour l'écrire dans le fichier qui servira à faire la recherche de mots-clés. Supprime tous les PDF à la fin.""" raa_data_dir = f'{data_dir}/raa/' text = "" reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.ocr.pdf') ftfy_config = ftfy.TextFixerConfig(unescape_html=False, explain=False) for page in reader.pages: try: text = text + "\n" + ftfy.fix_text(page.extract_text(), config=ftfy_config) except Exception as exc: logger.warning(f'ATTENTION: Impossible d\'extraire le texte du fichier {self.get_sha256()}.pdf : {exc}') # Écrit le texte du PDF dans un fichier texte pour une analyse future f = open(f'{raa_data_dir}{self.get_sha256()}.txt', 'w') f.write(text) f.close() # Supprime le PDF d'origine et la version OCRisée os.remove(f'{raa_data_dir}{self.get_sha256()}.pdf') os.remove(f'{raa_data_dir}{self.get_sha256()}.ocr.pdf') os.remove(f'{raa_data_dir}{self.get_sha256()}.flat.pdf') def write_properties(self, data_dir): """Écris les propriétés du RAA dans un fichier JSON.""" raa_data_dir = f'{data_dir}/raa/' pdf_creation_date_json = None pdf_modification_date_json = None if self.pdf_creation_date: pdf_creation_date_json = self.pdf_creation_date.astimezone(pytz.utc).isoformat(timespec="seconds") if self.pdf_modification_date: pdf_modification_date_json = self.pdf_modification_date.astimezone(pytz.utc).isoformat(timespec="seconds") properties = { 'version': 1, 'name': self.name, 'date': self.date.strftime("%Y-%m-%d"), 'url': quote(self.url, safe='/:'), 'first_seen_on': datetime.datetime.now(pytz.utc).isoformat(timespec="seconds"), 'pdf_creation_date': pdf_creation_date_json, 'pdf_modification_date': pdf_modification_date_json } f = open(f'{raa_data_dir}{self.get_sha256()}.json', 'w') f.write(json.dumps(properties)) f.close() def parse_metadata(self, data_dir): """Lance l'extraction des dates du PDF puis l'écriture de ses propriétés dans un fichier JSON.""" self.get_pdf_dates(data_dir) self.write_properties(data_dir) def __init__(self, data_dir, user_agent=''): logger.debug('Initialisation de Attrap') # On crée le dossier de téléchargement os.makedirs(data_dir, exist_ok=True) self.session = requests.Session() self.data_dir = data_dir self.found = False self.output_file_path = os.path.dirname(os.path.abspath(__file__)) + f'/output_{self.short_code}.log' self.sleep_time = 0 self.last_http_request = 0 self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.tor_socks5_key = None self.not_before = datetime.datetime(2024, 1, 1) self.smtp_configured = False self.mastodon = None self.mastodon_prefix = '' self.mastodon_suffix = '' self.safe_mode = False self.update_user_agent(user_agent) f = open(self.output_file_path, 'w') f.write('') f.close() self.print_output(str(self.__class__.__name__)) # Si le safe mode est activé, on configure un long délai entre chaque requête if os.getenv('SAFE_MODE'): self.safe_mode = True logger.warning('ATTENTION: le safe mode est activé, configuration d\'un délai entre chaque requête') self.sleep_time = 30 def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix): """Configuration de Mastodon afin de publier un toot à chaque RAA détecté.""" if access_token and access_token != "" and instance and instance != "": self.mastodon = Mastodon( access_token=access_token, api_base_url=instance ) self.mastodon_prefix = mastodon_prefix self.mastodon_suffix = mastodon_suffix def mastodon_toot(self, content): """Publie le toot en ajoutant le header et footer configurés.""" if self.mastodon: toot = content if not self.mastodon_prefix == '': toot = f"{self.mastodon_prefix}\n\n{toot}" if not self.mastodon_suffix == '': toot = f"{toot}\n\n{self.mastodon_suffix}" self.mastodon.toot(toot) def enable_tor(self, max_requests=0): """Active l'utilisation de Tor pour effectuer les requêtes.""" if not self.safe_mode: self.tor_enabled = True self.tor_max_requests = max_requests self.tor_get_new_id() else: logger.warning('ATTENTION: le safe mode est activé, Tor n\'a pas été activé') def disable_tor(self): """Désactive l'utilisation de Tor.""" proxies = {} self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.session.proxies.update(proxies) def tor_get_new_id(self): """Change de circuit Tor. Cela permet de changer de noeud de sortie donc d'IP.""" if self.tor_enabled: self.tor_socks5_key = 'attrap_' + ''.join(random.choices(string.ascii_lowercase, k=20)) proxies = { "http": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050", "https": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050", } self.session.proxies.update(proxies) self.tor_requests = 0 def get_sub_pages(self, page_content, element, host, recursive_until_pdf): """ Récupère, à partir d'un chemin CSS, les sous-pages d'une page. page_content -- Un contenu HTML à analyser element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée host -- Le nom d'hôte du site recursive_until_pdf -- Un booléen pour savoir s'il faut rechercher un fichier PDF dans le chemin CSS. Le cas échéant, relance la recherche sur la sous-page si le lien n'est pas un PDF. """ soup = BeautifulSoup(page_content, 'html.parser') sub_pages = [] for a in soup.select(element): if a.get('href'): url = f"{host}{a['href']}" if recursive_until_pdf: sub_page_content = self.get_page(url, 'get').content if not self.has_pdf(sub_page_content): logger.info( f'{url} ne contient pas de PDF, on récupère ses sous-pages' ) for sub_sub_page in self.get_sub_pages( sub_page_content, element, host, recursive_until_pdf ): sub_pages.append(sub_sub_page) else: sub_page = { 'url': url, 'name': a.get_text().strip() } sub_pages.append(sub_page) else: sub_page = { 'url': url, 'name': a.get_text().strip() } sub_pages.append(sub_page) return sub_pages def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, details_element, host): """ Récupère, à partir d'un chemin CSS, les sous-pages d'une page contenant un pager. page -- L'URL de la page à analyser sub_page_element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée pager_element -- Le chemin CSS vers le lien de page suivante du pager details_element -- Le chemin CSS vers l'objet contenant les détails de la sous-page recherchée host -- Le nom d'hôte du site """ pages = [] page_content = self.get_page(page, 'get').content # On initialise le parser soup = BeautifulSoup(page_content, 'html.parser') # On recherche les sous-pages sub_pages = soup.select(sub_page_element) sub_pages_details = None if details_element is not None: sub_pages_details = soup.select(details_element) i = 0 for sub_page in sub_pages: if sub_page.get('href'): page = { 'url': f"{host}{sub_page['href']}", 'name': sub_page.get_text().strip(), 'details': '' } if details_element is not None: page['details'] = sub_pages_details[i].get_text().strip() pages.append(page) i = i + 1 # On recherche un pager, et si on le trouve on le suit pager = soup.select(pager_element) if pager and pager[0] and pager[0].get('href'): for sub_page in self.get_sub_pages_with_pager( f"{host}{pager[0]['href']}", sub_page_element, pager_element, details_element, host ): pages.append(sub_page) return pages def get_raa_with_pager(self, pages_list, pager_element, host, filter_from_last_element_date=False): """ Récupère et analyse les RAA d'une page contenant un pager. pages_list -- Un tableau contenant la liste des pages pager_element -- Le chemin CSS vers le lien de page suivante du pager host -- Le nom d'hôte du site filter_from_last_element_date -- (Optionnel) Si la date du dernier élément de la dernière page parsée n'est pas dans la plage temporelle voulue, ne charge pas les pages suivantes. Par défaut à False. Ne doit être activé que si l'ordre des éléments est chronologique. """ elements = [] # On parse chaque page passée en paramètre for page in pages_list: page_content = self.get_page(page, 'get').content # Pour chaque page, on récupère les PDF for raa in self.get_raa_elements(page_content): elements.append(raa) # Si la date du dernier RAA est dans la plage temporelle voulue, # on regarde également s'il n'y aurait pas un pager if not filter_from_last_element_date or (filter_from_last_element_date and (elements[-1].date >= Attrap.get_aware_datetime(self.not_before))): sub_pages = [] for sub_page in self.get_sub_pages( page_content, pager_element, host, True ): sub_pages.append(sub_page['url']) for sub_raa in self.get_raa_with_pager( sub_pages, pager_element, host, filter_from_last_element_date=filter_from_last_element_date ): elements.append(sub_raa) return elements def set_sleep_time(self, sleep_time): """Configure le temps de temporisation""" self.sleep_time = sleep_time def has_pdf(self, page_content): """ Renvoie un booléen Vrai si la page contient un lien vers un PDF page_content -- Un contenu HTML à analyser """ elements = [] soup = BeautifulSoup(page_content, 'html.parser') for a in soup.find_all('a', href=True): if a['href'].endswith('.pdf'): return True return False # On démarre le navigateur def get_session(self, url, wait_element, remaining_retries=0): """ Lance un navigateur avec Selenium. url -- URL à interroger wait_element -- Élement (désigné par son identifiant CSS) qui indique que la page est chargée remaining_retries -- Nombre d'échecs autorisé avant de soulever une erreur """ webdriver_options = webdriver.ChromeOptions() webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-gpu") webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--disable-blink-features=AutomationControlled") if not self.user_agent == "": webdriver_options.add_argument(f"--user-agent={self.user_agent}") webdriver_options.add_argument("--headless") webdriver_options.add_argument("--window-size=1024,768") display = Display(visible=False, size=(1024, 768)) display.start() browser = webdriver.Chrome(options=webdriver_options) # Téléchargement de l'URL browser.get(url) if wait_element is not None: # On attend que le navigateur ait passé les tests anti-robots et # que le contenu s'affiche try: WebDriverWait(browser, 60).until( expected_conditions.presence_of_element_located( ( By.ID, wait_element ) ) ) except TimeoutException as exc: logger.warning(f'TimeoutException: {exc}') if remaining_retries > 0: time.sleep(5) return self.get_session(url, wait_element, (remaining_retries - 1)) else: raise TimeoutException(exc) page_content = browser.page_source # On récupère les cookies du navigateur pour les réutiliser plus tard for cookie in browser.get_cookies(): self.session.cookies.set(cookie['name'], cookie['value']) # On arrête le navigateur browser.quit() display.stop() return page_content def print_output(self, data): """Affiche dans le terminal et dans le fichier de log un texte""" print(data) data = data.replace('\033[92m', '') data = data.replace('\033[0m', '') data = data.replace('\033[1m', '') f = open(self.output_file_path, 'a') f.write(data + "\n") f.close() def get_page(self, url, method, data={}): """ Récupère le contenu HTML d'une page web url -- L'URL de la page demandée method -- 'post' ou 'get', selon le type de requête data -- Un dictionnaire contenant les données à envoyer au site """ try: logger.debug(f'Chargement de la page {url}') # Si un délai a été configuré, on vérifie qu'il n'est pas trop tôt pour lancer la requête if self.sleep_time > 0: current_time = int(time.mktime(datetime.datetime.today().timetuple())) remaining_sleep_time = self.last_http_request + self.sleep_time - current_time if remaining_sleep_time > 0: time.sleep(remaining_sleep_time) self.last_http_request = int(time.mktime(datetime.datetime.today().timetuple())) page = None if method == 'get': page = self.session.get(url, timeout=(10, 120)) if method == 'post': page = self.session.post(url, data=data, timeout=(10, 120)) if page.status_code == 429: logger.warning('Erreur 429 Too Many Requests reçue, temporisation...') self.tor_get_new_id() time.sleep(1) return self.get_page(url, method, data) if self.tor_enabled: self.tor_requests += 1 if self.tor_max_requests > 0 and \ self.tor_requests > self.tor_max_requests: self.tor_get_new_id() return page except requests.exceptions.ConnectionError: logger.warning(f'Erreur de connexion, temporisation...') self.tor_get_new_id() time.sleep(30) return self.get_page(url, method, data) except requests.exceptions.Timeout: logger.warning(f'Timeout, on relance la requête...') return self.get_page(url, method, data) except urllib3.exceptions.ProtocolError: logger.warning(f'Erreur de connexion, on relance la requête...') return self.get_page(url, method, data) def update_user_agent(self, user_agent): """Change la valeur du user-agent""" self.user_agent = user_agent self.session.headers.update({'User-Agent': self.user_agent}) def download_file(self, raa): """Télécharge un RAA""" try: os.makedirs( os.path.dirname(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf'), exist_ok=True ) file = self.get_page(raa.url, 'get') f = open(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', 'wb') f.write(file.content) f.close() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...') self.download_file(raa) except Exception as exc: logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}') def ocr(self, raa, retry_on_failure=True): """OCRise un RAA""" cmd = [ 'python3', 'bin/ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdf', '--redo-ocr', '--skip-big', '250', '--max-image-mpixels', '250', '--invalidate-digital-signatures', '--optimize', '0', f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf', f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf' ] logger.debug(f'Lancement de ocrmypdf: {cmd}') try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: if exc.returncode == 2 and retry_on_failure: logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger') if self.tor_enabled: self.tor_get_new_id() self.download_file(raa) self.ocr(raa, False) elif (not exc.returncode == 6) and (not exc.returncode == 10) and (not exc.returncode == 4): logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output) shutil.copy(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf') def flatten_pdf(self, raa): """Supprime les formulaires d'un PDF pour pouvoir les OCRiser après dans OCRmyPDF.""" reader = PdfReader(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf') writer = PdfWriter() for page in reader.pages: if page.get('/Annots'): for annot in page.get('/Annots'): writer_annot = annot.get_object() writer_annot.update({ NameObject("/Ff"): NumberObject(1) }) writer.add_page(page) writer.write(f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf') def search_keywords(self, raa, keywords): """Recherche des mots-clés dans le texte extrait du PDF""" if keywords and not keywords == '': text = open(f'{self.data_dir}/raa/{raa.get_sha256()}.txt').read() date_str = raa.date.strftime("%d/%m/%Y") found = False found_keywords = [] for keyword in keywords.split(','): if re.search(keyword, text, re.IGNORECASE | re.MULTILINE): if not found: url = quote(raa.url, safe='/:') self.print_output(f'\033[92m{raa.name}\033[0m ({date_str})') self.print_output(f'URL : {url}') found = True self.found = True self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.') found_keywords.append(keyword) if found: self.print_output('') url = quote(raa.url, safe='/:') found_keywords_str = ', '.join( [str(x) for x in found_keywords] ) self.mastodon_toot( f'{raa.name} ({date_str})\n\nLes termes suivants ont ' f'été trouvés : {found_keywords_str}.\n\nURL : {url}' ) def parse_raa(self, elements, keywords): """ Démarre l'analyse des RAA. elements -- Un tableau contenant les RAA à analyser keywords -- Les mots-clés à rechercher dans chaque RAA """ self.print_output(f'Termes recherchés: {keywords}') self.print_output('') for raa in elements: # Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la # date maximale d'analyse, on le télécharge et on le parse if not os.path.isfile(f'{self.data_dir}/raa/{raa.get_sha256()}.txt') and (not raa.date or (raa.date >= Attrap.get_aware_datetime(self.not_before))): url = quote(raa.url, safe='/:') self.download_file(raa) try: raa.parse_metadata(self.data_dir) # Lorsque la date du RAA n'est pas connue, on a dû télécharger le PDF pour récupérer la date de ses métadonnées. # Donc on vérifie à nouveau ici si la date correspond à ce qu'on veut analyser if (raa.date and raa.date >= Attrap.get_aware_datetime(self.not_before)): date_str = raa.date.strftime("%d/%m/%Y") logger.info(f'Nouveau fichier : {raa.name} ({date_str}). URL : {url}') self.flatten_pdf(raa) self.ocr(raa, True) raa.extract_content(self.data_dir) self.search_keywords(raa, keywords) else: os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf') logger.error(f'ERREUR: le RAA {raa.name} n\'a pas de date !') sys.exit(1) except PdfStreamError as exc: logger.warning(f'ATTENTION: le RAA à l\'adresse {raa.url} n\'est pas valide ! On l\'ignore...') except EmptyFileError as exc: logger.warning(f'ATTENTION: le RAA à l\'adresse {raa.url} est vide ! On l\'ignore...') def get_raa(self, page_content): logger.error('Cette fonction doit être surchargée') def configure_mailer(self, smtp_host, smtp_username, smtp_password, smtp_port, smtp_starttls, smtp_ssl, email_from, email_to, email_object): """ Configure les courriers électroniques. smtp_host -- Nom d'hôte du serveur SMTP smtp_username -- Nom d'utilisateur pour se connecter au serveur SMTP smtp_password -- Mot de passe pour se connecter au serveur SMTP smtp_port -- Port de connexion au serveur SMTP smtp_starttls -- Booléen. Si vrai, se connecte avec STARTTLS smtp_ssl -- Booléen. Si vrai, se connecte avec SSL/TLS email_from -- Adresse électronique de l'expéditeur des notifications email_to -- Tableau contenant les adresses électroniques à notifier email_object -- Objet du courrier électronique de notification """ self.smtp_host = smtp_host self.smtp_username = smtp_username self.smtp_password = smtp_password if smtp_port <= 0: self.smtp_port = 587 else: self.smtp_port = int(smtp_port) self.smtp_starttls = smtp_starttls self.smtp_ssl = smtp_ssl self.email_from = email_from self.email_to = email_to self.email_object = email_object if smtp_host and smtp_username and smtp_password and email_from and email_to and email_object: self.smtp_configured = True def mailer(self): if self.smtp_configured and self.found: try: message = email.message.EmailMessage() message.set_content(open(self.output_file_path).read()) message['Subject'] = self.email_object message['From'] = self.email_from message['Message-ID'] = email.utils.make_msgid(domain=self.email_from.split('@')[-1]) message['Date'] = email.utils.formatdate() context = ssl.create_default_context() if self.smtp_ssl is True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP_SSL(self.smtp_host, port, context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() elif self.smtp_starttls is True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) smtp.starttls(context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() else: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() except Exception as exc: logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}') def guess_date(string, regex): """ Essaie de deviner la date d'un RAA à partir de son nom. Utile pour limiter les requêtes lors de l'obtention des RAA à scanner. """ try: search = re.search(regex, string, re.IGNORECASE) guessed_date = dateparser.parse(search.group(1), languages=['fr']) if guessed_date is None: raise Exception('La date est un objet None') else: return guessed_date except Exception as exc: logger.warning(f'Impossible de deviner la date du terme {string} : {exc}') return datetime.datetime(9999, 1, 1) def get_aware_datetime(unknown_datetime): """ Retourne un objet datetime avisé. datetime - L'objet datetime à aviser. Utilise le fuseau 'Europe/Paris' si datetime est naïf. """ if unknown_datetime.tzinfo is not None and unknown_datetime.tzinfo.utcoffset(unknown_datetime) is not None: return unknown_datetime else: tz_paris = pytz.timezone('Europe/Paris') return tz_paris.localize(unknown_datetime)