import os, re, ssl import subprocess import logging import requests import time import datetime from urllib.parse import quote from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions from bs4 import BeautifulSoup from pyvirtualdisplay import Display from pdfminer.high_level import extract_text from stem import Signal from stem.control import Controller import hashlib import smtplib from email.message import EmailMessage logger = logging.getLogger(__name__) class RAAspotter: class RAA: url = "" date = datetime.datetime(1970, 1, 1) name = "" filename = "" sha256 = "" def __init__(self, url, date, name, filename): if not url == "": self.url = url if not date == "": self.date = date if not name == "": self.name = name if not filename == "": self.filename = filename def get_sha256(self): if (self.sha256 == ""): self.sha256 = hashlib.sha256(self.filename.encode('utf-8')).hexdigest() return self.sha256 def __init__(self, data_dir, user_agent=''): logger.debug('Initialisation de RAAspotter') self.session = requests.Session() self.data_dir = data_dir self.found = False self.output_file_path = os.path.dirname(os.path.abspath(__file__))+'/output.log' self.sleep_time = 0 self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.not_before = datetime.datetime(1970, 1, 1) self.smtp_configured = False self.update_user_agent(user_agent) f = open(self.output_file_path,'w') f.write('') f.close() def enable_tor(self, max_requests=0): proxies = { "http": f"socks5h://localhost:9050", "https": f"socks5h://localhost:9050", } self.tor_enabled = True self.tor_max_requests = max_requests self.tor_requests = 0 self.session.proxies.update(proxies) def disable_tor(self): proxies = {} self.tor_enabled = False self.tor_max_requests = 0 self.tor_requests = 0 self.session.proxies.update(proxies) def tor_get_new_id(self): logger.info('Changement d\'identité Tor') try: controller = Controller.from_port(port = 9051) controller.authenticate() controller.signal(Signal.NEWNYM) time.sleep(3) self.tor_requests = 0 except: logger.debug('Impossible de changer d\'identité Tor') def get_sub_pages(self, page_content, element, host=""): soup = BeautifulSoup(page_content, 'html.parser') sub_pages = [] for a in soup.select(element): url = f"{host}{a['href']}" sub_page_content = self.get_page(url).content if not self.has_pdf(sub_page_content): logger.info(f'{url} ne contient pas de PDF, on récupère ses sous-pages') for sub_sub_page in self.get_sub_pages(sub_page_content, element, host): sub_pages.append(sub_sub_page) else: sub_pages.append(url) return sub_pages def set_sleep_time(self, sleep_time): self.sleep_time = sleep_time def has_pdf(self, page_content): elements = [] soup = BeautifulSoup(page_content, 'html.parser') for a in soup.find_all('a', href=True): if a['href'].endswith('.pdf'): return True return False # On démarre le navigateur def get_session(self, url, wait_element=""): webdriver_options = webdriver.ChromeOptions() webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-gpu") webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--disable-blink-features=AutomationControlled") if not self.user_agent == "": webdriver_options.add_argument(f"--user-agent={self.user_agent}") webdriver_options.add_argument("--headless") webdriver_options.add_argument("--window-size=1024,768") display = Display(visible=False, size=(1024, 768)) display.start() browser = webdriver.Chrome(options=webdriver_options) # Téléchargement de l'URL browser.get(url) if not wait_element == "": # On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element))) page_content = browser.page_source # On récupère les cookies du navigateur pour les réutiliser plus tard for cookie in browser.get_cookies(): self.session.cookies.set(cookie['name'], cookie['value']) # On arrête le navigateur browser.quit() display.stop() return page_content def print_output(self, data): print(data) data = data.replace('\033[92m', '') data = data.replace('\033[0m', '') data = data.replace('\033[1m', '') f = open(self.output_file_path,'a') f.write(data+"\n") f.close() def get_page(self, url): logger.debug(f'Chargement de la page {url}') if self.sleep_time > 0: time.sleep(self.sleep_time) page = self.session.get(url) if self.tor_enabled: self.tor_requests+=1 if self.tor_max_requests>0 and self.tor_requests>self.tor_max_requests: self.tor_get_new_id() return page def update_user_agent(self, user_agent): self.user_agent = user_agent self.session.headers.update({'User-Agent': self.user_agent}) def download_file(self, raa): try: os.makedirs(os.path.dirname(f'{self.data_dir}{raa.get_sha256()}.pdf'), exist_ok=True) file = self.get_page(raa.url) f = open(f'{self.data_dir}{raa.get_sha256()}.pdf','wb') f.write(file.content) f.close() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...') self.download_file(raa) except Exception as exc: logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}') def parse_pdf(self, raa, keywords): if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.pdf'): logger.warning(f'ATTENTION: le fichier {raa.get_sha256()}.pdf n\'existe pas') else: text = extract_text(f'{self.data_dir}{raa.get_sha256()}.pdf') found = False for keyword in keywords: if re.search(keyword, text, re.IGNORECASE|re.MULTILINE): if not found: url = quote(raa.url, safe='/:') self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date})') self.print_output(f'URL : {url}') found = True self.found = True self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.') # Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF f = open(f'{self.data_dir}{raa.get_sha256()}.txt','w') f.write(text) f.close() os.remove(f'{self.data_dir}{raa.get_sha256()}.pdf') if found: self.print_output('') def ocr(self, raa, retry_on_failure=True): cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , f'{self.data_dir}{raa.get_sha256()}.pdf', f'{self.data_dir}{raa.get_sha256()}.pdf'] logger.debug(f'Lancement de ocrmypdf: {cmd}') try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: if exc.returncode == 2 and retry_on_failure: logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger') self.download_file(raa) self.ocr(raa,False) elif (not exc.returncode == 6) and (not exc.returncode == 10): logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output) def parse_raa(self, elements, keywords): for raa in elements: # Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la date maximale d'analyse, # on le télécharge et on le parse if (raa.date > self.not_before) and (not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.txt')): url = quote(raa.url, safe='/:') logger.info(f'Nouveau fichier : {raa.name} ({raa.date}). URL : {url}') self.download_file(raa) self.ocr(raa, True) self.parse_pdf(raa, keywords) def get_raa(self, page_content): logger.error('Cette fonction doit être surchargée') def configure_mailer(self, smtp_host, smtp_username, smtp_password, smtp_port, smtp_starttls, smtp_ssl, email_from, email_to, email_object): self.smtp_host = smtp_host self.smtp_username = smtp_username self.smtp_password = smtp_password if smtp_port <= 0: self.smtp_port = 587 else: self.smtp_port = int(smtp_port) self.smtp_starttls = smtp_starttls self.smtp_ssl = smtp_ssl self.email_from = email_from self.email_to = email_to self.email_object = email_object if smtp_host and smtp_username and smtp_password and email_from and email_to and email_object: self.smtp_configured = True def mailer(self): if self.smtp_configured and self.found: try: message = EmailMessage() message.set_content(open(self.output_file_path).read()) message['Subject'] = self.email_object message['From'] = self.email_from context = ssl.create_default_context() if self.smtp_ssl == True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP_SSL(self.smtp_host, port, context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() elif self.smtp_starttls == True: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) smtp.starttls(context=context) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() else: for address in self.email_to.split(','): del message['To'] message['To'] = address smtp = smtplib.SMTP(self.smtp_host) if self.smtp_username: smtp.login(self.smtp_username, self.smtp_password) smtp.send_message(message) smtp.quit() except Exception as exc: logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')