import os, re, ssl import subprocess import logging import requests from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions from pyvirtualdisplay import Display from pdfminer.high_level import extract_text import hashlib import smtplib from email.message import EmailMessage logger = logging.getLogger(__name__) class RAAspotter: class RAA: url = "" date = "" name = "" filename = "" sha256 = "" def __init__(self, url, date, name, filename): if not url == "": self.url = url if not date == "": self.date = date if not name == "": self.name = name if not filename == "": self.filename = filename def get_sha256(self): if (self.sha256 == ""): self.sha256 = hashlib.sha256(self.filename.encode('utf-8')).hexdigest() return self.sha256 def __init__(self, data_dir, user_agent=''): logger.debug('Initialisation de RAAspotter') self.session = requests.Session() self.data_dir = data_dir self.found = False self.output_file_path = os.path.dirname(os.path.abspath(__file__))+'/output.log' self.update_user_agent(user_agent) f = open(self.output_file_path,'w') f.write('') f.close() # On démarre le navigateur def get_session(self, url, wait_element=""): webdriver_options = webdriver.ChromeOptions() webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-gpu") webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--disable-blink-features=AutomationControlled") if not self.user_agent == "": webdriver_options.add_argument(f"--user-agent={self.user_agent}") webdriver_options.add_argument("--headless") webdriver_options.add_argument("--window-size=1024,768") display = Display(visible=False, size=(1024, 768)) display.start() browser = webdriver.Chrome(options=webdriver_options) # Téléchargement de l'URL browser.get(url) if not wait_element == "": # On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element))) page_content = browser.page_source # On récupère les cookies du navigateur pour les réutiliser plus tard for cookie in browser.get_cookies(): self.session.cookies.set(cookie['name'], cookie['value']) # On arrête le navigateur browser.quit() display.stop() return page_content def print_output(self, data): print(data) data = data.replace('\033[92m', '') data = data.replace('\033[0m', '') data = data.replace('\033[1m', '') f = open(self.output_file_path,'a') f.write(data+"\n") f.close() def get_page(self, url): return self.session.get(url) def update_user_agent(self, user_agent): self.user_agent = user_agent self.session.headers.update({'User-Agent': self.user_agent}) def download_file(self, raa): try: os.makedirs(os.path.dirname(f'{self.data_dir}{raa.get_sha256()}.pdf'), exist_ok=True) file = self.get_page(raa.url) f = open(f'{self.data_dir}{raa.get_sha256()}.pdf','wb') f.write(file.content) f.close() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...') self.download_file(raa) except Exception as exc: logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}') def parse_pdf(self, raa, keywords): if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.pdf'): logger.warning(f'ATTENTION: le fichier {raa.get_sha256()}.pdf n\'existe pas') else: text = extract_text(f'{self.data_dir}{raa.get_sha256()}.pdf') found = False for keyword in keywords: if re.search(keyword, text, re.IGNORECASE|re.MULTILINE): if not found: self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date})') found = True self.found = True self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.') # Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF f = open(f'{self.data_dir}{raa.get_sha256()}.txt','w') f.write(text) f.close() os.remove(f'{self.data_dir}{raa.get_sha256()}.pdf') if found: self.print_output('') def ocr(self, raa, retry_on_failure=True): cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , f'{self.data_dir}{raa.get_sha256()}.pdf', f'{self.data_dir}{raa.get_sha256()}.pdf'] logger.debug(f'Lancement de ocrmypdf: {cmd}') try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: if exc.returncode == 2 and retry_on_failure: logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger') self.download_file(raa) self.ocr(raa,False) elif (not exc.returncode == 6) and (not exc.returncode == 10): logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output) def parse_raa(self, elements, keywords): for raa in elements: # Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.txt'): logger.info(f'Nouveau fichier : {raa.name} ({raa.date}). URL : {raa.url}') self.download_file(raa) self.ocr(raa, True) self.parse_pdf(raa, keywords) def get_raa(self, page_content): logger.error('Cette fonction doit être surchargée') def mailer(smtp_host, smtp_username, smtp_password, smtp_port, smtp_starttls, smtp_ssl, email_from, email_to, email_object, email_content): try: message = EmailMessage() message.set_content(email_content) message['Subject'] = email_object message['From'] = email_from context = ssl.create_default_context() if smtp_ssl == True: for address in email_to.split(','): message['To'] = address smtp = smtplib.SMTP_SSL(smtp_host, port, context=context) smtp.login(smtp_username, smtp_password) smtp.send_message(message) smtp.quit() elif smtp_starttls == True: for address in email_to.split(','): message['To'] = address smtp = smtplib.SMTP(smtp_host) smtp.starttls(context=context) smtp.login(smtp_username, smtp_password) smtp.send_message(message) smtp.quit() else: for address in email_to.split(','): message['To'] = address smtp = smtplib.SMTP(smtp_host) smtp.login(smtp_username, smtp_password) smtp.send_message(message) smtp.quit() except Exception as exc: logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')