-
Bastien Le Querrec a rédigéBastien Le Querrec a rédigé
RAAspotter.py 7,33 Kio
import os, re, ssl
import subprocess
import logging
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
import hashlib
import smtplib
from email.message import EmailMessage
logger = logging.getLogger(__name__)
class RAAspotter:
class RAA:
url = ""
date = ""
name = ""
filename = ""
sha256 = ""
def __init__(self, url, date, name, filename):
if not url == "":
self.url = url
if not date == "":
self.date = date
if not name == "":
self.name = name
if not filename == "":
self.filename = filename
def get_sha256(self):
if (self.sha256 == ""):
self.sha256 = hashlib.sha256(self.filename.encode('utf-8')).hexdigest()
return self.sha256
def __init__(self, data_dir, user_agent=''):
logger.debug('Initialisation de RAAspotter')
self.session = requests.Session()
self.data_dir = data_dir
self.found = False
self.output_file_path = os.path.dirname(os.path.abspath(__file__))+'/output.log'
self.update_user_agent(user_agent)
f = open(self.output_file_path,'w')
f.write('')
f.close()
# On démarre le navigateur
def get_session(self, url, wait_element=""):
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
if not self.user_agent == "":
webdriver_options.add_argument(f"--user-agent={self.user_agent}")
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement de l'URL
browser.get(url)
if not wait_element == "":
# On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element)))
page_content = browser.page_source
# On récupère les cookies du navigateur pour les réutiliser plus tard
for cookie in browser.get_cookies():
self.session.cookies.set(cookie['name'], cookie['value'])
# On arrête le navigateur
browser.quit()
display.stop()
return page_content
def print_output(self, data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(self.output_file_path,'a')
f.write(data+"\n")
f.close()
def get_page(self, url):
return self.session.get(url)
def update_user_agent(self, user_agent):
self.user_agent = user_agent
self.session.headers.update({'User-Agent': self.user_agent})
def download_file(self, raa):
try:
os.makedirs(os.path.dirname(f'{self.data_dir}{raa.get_sha256()}.pdf'), exist_ok=True)
file = self.get_page(raa.url)
f = open(f'{self.data_dir}{raa.get_sha256()}.pdf','wb')
f.write(file.content)
f.close()
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...')
self.download_file(raa)
except Exception as exc:
logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}')
def parse_pdf(self, raa, keywords):
if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.pdf'):
logger.warning(f'ATTENTION: le fichier {raa.get_sha256()}.pdf n\'existe pas')
else:
text = extract_text(f'{self.data_dir}{raa.get_sha256()}.pdf')
found = False
for keyword in keywords:
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date})')
found = True
self.found = True
self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
# Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
f = open(f'{self.data_dir}{raa.get_sha256()}.txt','w')
f.write(text)
f.close()
os.remove(f'{self.data_dir}{raa.get_sha256()}.pdf')
if found:
self.print_output('')
def ocr(self, raa, retry_on_failure=True):
cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , f'{self.data_dir}{raa.get_sha256()}.pdf', f'{self.data_dir}{raa.get_sha256()}.pdf']
logger.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
self.download_file(raa)
self.ocr(raa,False)
elif (not exc.returncode == 6) and (not exc.returncode == 10):
logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
def parse_raa(self, elements, keywords):
for raa in elements:
# Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse
if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.txt'):
logger.info(f'Nouveau fichier : {raa.name} ({raa.date}). URL : {raa.url}')
self.download_file(raa)
self.ocr(raa, True)
self.parse_pdf(raa, keywords)
def get_raa(self, page_content):
logger.error('Cette fonction doit être surchargée')
def mailer(smtp_host, smtp_username, smtp_password, smtp_port,
smtp_starttls, smtp_ssl, email_from, email_to, email_object,
email_content):
try:
message = EmailMessage()
message.set_content(email_content)
message['Subject'] = email_object
message['From'] = email_from
context = ssl.create_default_context()
if smtp_ssl == True:
for address in email_to.split(','):
message['To'] = address
smtp = smtplib.SMTP_SSL(smtp_host, port, context=context)
smtp.login(smtp_username, smtp_password)
smtp.send_message(message)
smtp.quit()
elif smtp_starttls == True:
for address in email_to.split(','):
message['To'] = address
smtp = smtplib.SMTP(smtp_host)
smtp.starttls(context=context)
smtp.login(smtp_username, smtp_password)
smtp.send_message(message)
smtp.quit()
else:
for address in email_to.split(','):
message['To'] = address
smtp = smtplib.SMTP(smtp_host)
smtp.login(smtp_username, smtp_password)
smtp.send_message(message)
smtp.quit()
except Exception as exc:
logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')