import os, re, ssl, sys import subprocess import logging import requests import time import datetime from urllib.parse import quote from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions import dateparser from bs4 import BeautifulSoup from pyvirtualdisplay import Display from pdfminer.high_level import extract_text from stem import Signal from stem.control import Controller import hashlib import smtplib from email.message import EmailMessage from mastodon import Mastodon logger = logging.getLogger(__name__) class RAAspotter: class RAA: url = "" date = datetime.datetime(1970, 1, 1) date_str = "" name = "" filename = "" sha256 = "" def __init__(self, url, date, name, filename): if not url == "": self.url = url if not date == "": self.date = date self.date_str = date.strftime("%d/%m/%Y") if not name == "": self.name = name if not filename == "": self.filename = filename def get_sha256(self): if (self.sha256 == ""): self.sha256 = hashlib.sha256(self.filename.encode('utf-8')).hexdigest() return self.sha256 def __init__(self, data_dir, user_agent=''): logger.debug('Initialisation de RAAspotter') self.session = requests.Session() self.data_dir = data_dir self.found = False self.output_file_path = os.path.dirname(os.path.abspath(__file__))+f'/output_{self.short_code}.log' self.sleep_time = 0 self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.not_before = datetime.datetime(2024, 1, 1) self.smtp_configured = False self.mastodon = None self.mastodon_prefix = '' self.mastodon_suffix = '' self.update_user_agent(user_agent) f = open(self.output_file_path,'w') f.write('') f.close() def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix): if access_token and access_token != "" and instance and instance != "": self.mastodon = Mastodon( access_token=access_token, api_base_url=instance ) self.mastodon_prefix = mastodon_prefix self.mastodon_suffix = mastodon_suffix def mastodon_toot(self, content): if self.mastodon: toot = content if not self.mastodon_prefix == '': toot = f"{self.mastodon_prefix}\n\n{toot}" if not self.mastodon_suffix == '': toot = f"{toot}\n\n{self.mastodon_suffix}" self.mastodon.toot(toot) def enable_tor(self, max_requests=0): proxies = { "http": f"socks5h://127.0.0.1:9050", "https": f"socks5h://127.0.0.1:9050", } self.tor_enabled = True self.tor_max_requests = max_requests self.tor_requests = 0 self.session.proxies.update(proxies) self.tor_get_new_id() def disable_tor(self): proxies = {} self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.session.proxies.update(proxies) def tor_get_new_id(self): logger.info('Changement d\'identité Tor') try: controller = Controller.from_port(port = 9051) controller.authenticate() controller.signal(Signal.NEWNYM) time.sleep(5) self.tor_requests = 0 except: logger.debug('Impossible de changer d\'identité Tor') def get_sub_pages(self, page_content, element, host, recursive_until_pdf): soup = BeautifulSoup(page_content, 'html.parser') sub_pages = [] for a in soup.select(element): if a.get('href'): url = f"{host}{a['href']}" if recursive_until_pdf: sub_page_content = self.get_page(url, 'get').content if not self.has_pdf(sub_page_content): logger.info(f'{url} ne contient pas de PDF, on récupère ses sous-pages') for sub_sub_page in self.get_sub_pages(sub_page_content, element, host, recursive_until_pdf): sub_pages.append(sub_sub_page) else: sub_page = { 'url': url, 'name': a.get_text().strip() } sub_pages.append(sub_page) else: sub_page = { 'url': url, 'name': a.get_text().strip() } sub_pages.append(sub_page) return sub_pages def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, host): pages = [] page_content = self.get_page(page, 'get').content # On initialise le parser soup = BeautifulSoup(page_content, 'html.parser') # On recherche les sous-pages sub_pages = soup.select(sub_page_element) for sub_page in sub_pages: if sub_page.get('href'): page = { 'url': f"{host}{sub_page['href']}", 'name': sub_page.get_text().strip() } pages.append(page) # On recherche un pager, et si on le trouve on le suit pager = soup.select(pager_element) if pager and pager[0] and pager[0].get('href'): for sub_page in self.get_sub_pages_with_pager(f"{host}{pager[0]['href']}", sub_page_element, pager_element, host): pages.append(sub_page) return pages def get_raa_with_pager(self, pages_list, pager_element, host): elements = [] # On parse chaque page passée en paramètre for page in pages_list: page_content = self.get_page(page, 'get').content # Pour chaque page, on récupère les PDF for raa in self.get_raa_elements(page_content): elements.append(raa) # On regarde également s'il n'y aurait pas un pager sub_pages = [] for sub_page in self.get_sub_pages(page_content, pager_element, host, True): sub_pages.append(sub_page['url']) for sub_raa in self.get_raa_with_pager(sub_pages, pager_element, host): elements.append(sub_raa) return elements def set_sleep_time(self, sleep_time): self.sleep_time = sleep_time def has_pdf(self, page_content): elements = [] soup = BeautifulSoup(page_content, 'html.parser') for a in soup.find_all('a', href=True): if a['href'].endswith('.pdf'): return True return False # On démarre le navigateur def get_session(self, url, wait_element=""): webdriver_options = webdriver.ChromeOptions() webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-gpu") webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--disable-blink-features=AutomationControlled") if not self.user_agent == "": webdriver_options.add_argument(f"--user-agent={self.user_agent}") webdriver_options.add_argument("--headless") webdriver_options.add_argument("--window-size=1024,768") display = Display(visible=False, size=(1024, 768)) display.start() browser = webdriver.Chrome(options=webdriver_options) # Téléchargement de l'URL browser.get(url) if not wait_element == "": # On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element))) page_content = browser.page_source # On récupère les cookies du navigateur pour les réutiliser plus tard for cookie in browser.get_cookies(): self.session.cookies.set(cookie['name'], cookie['value']) # On arrête le navigateur browser.quit() display.stop() return page_content def print_output(self, data): print(data) data = data.replace('\033[92m', '') data = data.replace('\033[0m', '') data = data.replace('\033[1m', '') f = open(self.output_file_path,'a') f.write(data+"\n") f.close() def get_page(self, url, method, data={}): try: logger.debug(f'Chargement de la page {url}') if self.sleep_time > 0: time.sleep(self.sleep_time) page = None if method == 'get': page = self.session.get(url) if method == 'post': page = self.session.post(url, data=data) if page.status_code == 429: logger.info(f'Erreur 429 Too Many Requests reçue, temporisation...') self.tor_get_new_id() time.sleep(55) return self.get_page(url, method, data) if self.tor_enabled: self.tor_requests+=1 if self.tor_max_requests>0 and self.tor_requests>self.tor_max_requests: self.tor_get_new_id() return page except requests.exceptions.ConnectionError as exc: logger.info(f'Erreur de connexion, temporisation...') self.tor_get_new_id() time.sleep(55) return self.get_page(url, method, data) def update_user_agent(self, user_agent): self.user_agent = user_agent self.session.headers.update({'User-Agent': self.user_agent}) def download_file(self, raa): try: os.makedirs(os.path.dirname(f'{self.data_dir}{raa.get_sha256()}.pdf'), exist_ok=True) file = self.get_page(raa.url, 'get') f = open(f'{self.data_dir}{raa.get_sha256()}.pdf','wb') f.write(file.content) f.close() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...') self.download_file(raa) except Exception as exc: logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}') def parse_pdf(self, raa, keywords): if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.pdf'): logger.warning(f'ATTENTION: le fichier {raa.get_sha256()}.pdf n\'existe pas') else: text = extract_text(f'{self.data_dir}{raa.get_sha256()}.pdf') found = False found_keywords = [] for keyword in keywords: if re.search(keyword, text, re.IGNORECASE|re.MULTILINE): if not found: url = quote(raa.url, safe='/:') self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date_str})') self.print_output(f'URL : {url}') found = True self.found = True self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.') found_keywords.append(keyword) # Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF f = open(f'{self.data_dir}{raa.get_sha256()}.txt','w') f.write(text) f.close() os.remove(f'{self.data_dir}{raa.get_sha256()}.pdf') if found: self.print_output('') url = quote(raa.url, safe='/:') found_keywords_str = ', '.join([str(x) for x in found_keywords]) self.mastodon_toot(f"{raa.name} ({raa.date_str})\n\nLes termes suivants ont été trouvés : {found_keywords_str}.\n\nURL : {url}") def ocr(self, raa, retry_on_failure=True): cmd = [ 'ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500', '--invalidate-digital-signatures', f'{self.data_dir}{raa.get_sha256()}.pdf', f'{self.data_dir}{raa.get_sha256()}.pdf' ] logger.debug(f'Lancement de ocrmypdf: {cmd}') try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: if exc.returncode == 2 and retry_on_failure: logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger') if self.tor_enabled: self.tor_get_new_id() self.download_file(raa) self.ocr(raa,False) elif (not exc.returncode == 6) and (not exc.returncode == 10): logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output) def parse_raa(self, elements, keywords): for raa in elements: # Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la date maximale d'analyse, # on le télécharge et on le parse if (raa.date >= self.not_before) and (not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.txt')): url = quote(raa.url, safe='/:') logger.info(f'Nouveau fichier : {raa.name} ({raa.date_str}). URL : {url}') self.download_file(raa) self.ocr(raa, True) self.parse_pdf(raa, keywords) def get_raa(self, page_content): logger.error('Cette fonction doit être surchargée') def configure_mailer(self, smtp_host, smtp_username, smtp_password, smtp_port, smtp_starttls, smtp_ssl, email_from, email_to, email_object): self.smtp_host = smtp_host self.smtp_username = smtp_username self.smtp_password = smtp_password if smtp_port <= 0: self.smtp_port = 587 else: self.smtp_port = int(smtp_port) self.smtp_starttls = smtp_starttls self.smtp_ssl = smtp_ssl self.email_from = email_from self.email_to = email_to self.email_object = email_object if smtp_host and smtp_username and smtp_password and email_from and email_to and email_object: self.smtp_configured = True def mailer(self): if self.smtp_configured and self.found: try: message = EmailMessage() message.set_content(open(self.output_file_path).read()) message['Subject'] = self.email_object message['From'] = self.email_from context = ssl.create_default_context() if self.smtp_ssl == True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP_SSL(self.smtp_host, port, context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() elif self.smtp_starttls == True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) smtp.starttls(context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() else: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() except Exception as exc: logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}') # Fonction qui essaie de deviner la date d'un RAA à partir de son nom. # Utile pour limiter les requêtes lors de l'obtention des RAA à scanner. def guess_date(string, regex): try: search = re.search(regex, string, re.IGNORECASE) guessed_date = dateparser.parse(search.group(1)) if guessed_date == None: raise Exception('La date est un objet None') else: return guessed_date except Exception as exc: logger.warning(f"Impossible de deviner la date du terme {string} : {exc}") return datetime.datetime(9999, 1, 1)