#!/usr/bin/env python3 import os, sys, time, re import subprocess from bs4 import BeautifulSoup import argparse from urllib.parse import unquote import logging import requests from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions from pyvirtualdisplay import Display from pdfminer.high_level import extract_text # Config __RAA_PAGE = 'https://www.prefecturedepolice.interieur.gouv.fr/actualites-et-presse/arretes/accueil-arretes' __USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' __headless_mode = True __LIST = os.getenv('LIST') or 'vidéoprotection,caméras,captation,aéronef' __DATA_DIR = os.path.dirname(os.path.abspath(__file__))+'/data/ppparis/' # Fonctions def print_output(data): print(data) data = data.replace('\033[92m', '') data = data.replace('\033[0m', '') data = data.replace('\033[1m', '') f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a') f.write(data+"\n") f.close() # On charge l'URL avec la liste des fichiers def get_html(url): browser.get(url) # On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche element = WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, "block-decree-list-block"))) return browser.page_source def download_file(url, dest): try: os.makedirs(os.path.dirname(dest), exist_ok=True) file = session.get(url) f = open(dest,'wb') f.write(file.content); f.close() except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError): logging.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {url}, nouvelle tentative...') download_file(url, dest) except Exception as exc: logging.warning(f'ATTENTION: Impossible de télécharger le fichier {url}: {exc}') def get_txt_file(filename): return re.sub('(\.pdf)$', '.txt', filename) def parse_pdf(filename, name, date): if not os.path.isfile(__DATA_DIR+filename): logging.warning(f'ATTENTION: le fichier {filename} n\'existe pas') else: text = extract_text(__DATA_DIR+filename) found = False for keyword in __LIST.split(','): if re.search(keyword, text, re.IGNORECASE|re.MULTILINE): if not found: print_output(f'\033[92m{name}\033[0m ({date})') found = True print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.') # Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF f = open(get_txt_file(__DATA_DIR+filename),'w') f.write(text) f.close() os.remove(__DATA_DIR+filename) if found: print_output('') def ocr(file,url,retry_on_failure=True): cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , __DATA_DIR+filename, __DATA_DIR+filename] logging.debug(f'Lancement de ocrmypdf: {cmd}') try: output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as exc: if exc.returncode == 2 and retry_on_failure: logging.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger') download_file(url,file) ocr(file,url,False) elif not exc.returncode == 6: logging.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output) # Début du script parser = argparse.ArgumentParser(prog='ppparis.py', description='Télécharge les RAA de la Préfecture de police de Paris et recherche des mots-clés') parser.add_argument('-n', '--no-headless', action='store_true', help='ne lance pas le navigateur en mode headless (pratique pour débugguer ou en dehors d\'une CI)') parser.add_argument('-l', '--list', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)') parser.add_argument('-v', action='store_true', help='relève le niveau de verbosité à INFO') parser.add_argument('-vv', action='store_true', help='relève le niveau de verbosité à DEBUG') args = parser.parse_args() if args.v or os.getenv('VERBOSE'): logging.basicConfig(level=logging.INFO) if args.vv or os.getenv('VVERBOSE'): logging.basicConfig(level=logging.DEBUG) if args.no_headless: __headless_mode = False if not __headless_mode: logging.debug('Mode no-headless') if args.list: __LIST = args.list logging.info(f'Termes recherchés: {__LIST}') # On crée le dossier de téléchargement os.makedirs(__DATA_DIR, exist_ok=True) # On démarre le navigateur webdriver_options = webdriver.ChromeOptions() webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-gpu") webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--disable-blink-features=AutomationControlled") webdriver_options.add_argument(f"--user-agent={__USER_AGENT}") if __headless_mode: webdriver_options.add_argument("--headless") webdriver_options.add_argument("--window-size=1024,768") display = Display(visible=False, size=(1024, 768)) display.start() else: webdriver_options.add_argument("--start-maximized") browser = webdriver.Chrome(options=webdriver_options) # Téléchargement des RAA page_content = get_html(__RAA_PAGE) # On récupère les cookies du navigateur pour les réutiliser lors du téléchargement des PDF session = requests.Session() for cookie in browser.get_cookies(): session.cookies.set(cookie['name'], cookie['value']) session.headers.update({'User-Agent': __USER_AGENT}) # On arrête le navigateur browser.quit() if __headless_mode: display.stop() # On charge le parser soup = BeautifulSoup(page_content, 'html.parser') # Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse for a in soup.find_all('a', href=True): if a['href'].endswith('.pdf'): if a['href'].startswith('/'): url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href'] else: url = a['href'] name = a.find('span').get_text() date = a.find('div', class_="field--type-datetime").get_text() filename = unquote(url.split('/')[-1]) # Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse if not os.path.isfile(get_txt_file(__DATA_DIR+filename)): logging.info(f'Nouveau fichier : {name} ({date}). URL : {url}') download_file(url, __DATA_DIR+filename) ocr(__DATA_DIR+filename,url,True) parse_pdf(filename, name, date)