Newer
Older
#!/usr/bin/env python3
import os, sys, time, re
import subprocess
from bs4 import BeautifulSoup
import argparse
from urllib.parse import unquote
import logging

Bastien Le Querrec
a validé
import requests

Bastien Le Querrec
a validé
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
# Config
__RAA_PAGE = 'https://www.prefecturedepolice.interieur.gouv.fr/actualites-et-presse/arretes/accueil-arretes'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
__headless_mode = True
__LIST = os.getenv('LIST') or 'vidéoprotection,caméras,captation,aéronef'
__DATA_DIR = os.path.dirname(os.path.abspath(__file__))+'/data/ppparis/'
# Fonctions
def print_output(data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a')
f.write(data+"\n")
f.close()

Bastien Le Querrec
a validé
# On charge l'URL avec la liste des fichiers

Bastien Le Querrec
a validé
# On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
element = WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, "block-decree-list-block")))

Bastien Le Querrec
a validé
return browser.page_source
def download_file(url, dest):
try:
os.makedirs(os.path.dirname(dest), exist_ok=True)
file = session.get(url)
f = open(dest,'wb')
f.write(file.content);
f.close()
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
logging.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {url}, nouvelle tentative...')
download_file(url, dest)

Bastien Le Querrec
a validé
except Exception as exc:
logging.warning(f'ATTENTION: Impossible de télécharger le fichier {url}: {exc}')
def get_txt_file(filename):
return re.sub('(\.pdf)$', '.txt', filename)
def parse_pdf(filename, name, date):
if not os.path.isfile(__DATA_DIR+filename):
logging.warning(f'ATTENTION: le fichier {filename} n\'existe pas')
else:
text = extract_text(__DATA_DIR+filename)
found = False
for keyword in __LIST.split(','):
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
print_output(f'\033[92m{name}\033[0m ({date})')
found = True
print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
# Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
f = open(get_txt_file(__DATA_DIR+filename),'w')
f.write(text)
os.remove(__DATA_DIR+filename)
def ocr(file,url,retry_on_failure=True):
cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , __DATA_DIR+filename, __DATA_DIR+filename]
logging.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logging.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
download_file(url,file)
ocr(file,url,False)
elif not exc.returncode == 6:
logging.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
# Début du script
parser = argparse.ArgumentParser(prog='ppparis.py', description='Télécharge les RAA de la Préfecture de police de Paris et recherche des mots-clés')
parser.add_argument('-n', '--no-headless', action='store_true', help='ne lance pas le navigateur en mode headless (pratique pour débugguer ou en dehors d\'une CI)')
parser.add_argument('-l', '--list', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)')
parser.add_argument('-v', action='store_true', help='relève le niveau de verbosité à INFO')
parser.add_argument('-vv', action='store_true', help='relève le niveau de verbosité à DEBUG')
args = parser.parse_args()

Bastien Le Querrec
a validé
if args.v or os.getenv('VERBOSE'):

Bastien Le Querrec
a validé
if args.vv or os.getenv('VVERBOSE'):
__headless_mode = False
if not __headless_mode:
logging.debug('Mode no-headless')
logging.info(f'Termes recherchés: {__LIST}')
# On crée le dossier de téléchargement
os.makedirs(__DATA_DIR, exist_ok=True)
# On démarre le navigateur
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
webdriver_options.add_argument(f"--user-agent={__USER_AGENT}")
if __headless_mode:
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
else:
webdriver_options.add_argument("--start-maximized")
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement des RAA
page_content = get_html(__RAA_PAGE)

Bastien Le Querrec
a validé
# On récupère les cookies du navigateur pour les réutiliser lors du téléchargement des PDF
session = requests.Session()
for cookie in browser.get_cookies():
session.cookies.set(cookie['name'], cookie['value'])
session.headers.update({'User-Agent': __USER_AGENT})
# On arrête le navigateur
browser.quit()
if __headless_mode:
display.stop()
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href']
else:
url = a['href']
name = a.find('span').get_text()
date = a.find('div', class_="field--type-datetime").get_text()
filename = unquote(url.split('/')[-1])
# Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse
if not os.path.isfile(get_txt_file(__DATA_DIR+filename)):
logging.info(f'Nouveau fichier : {name} ({date}). URL : {url}')

Bastien Le Querrec
a validé
download_file(url, __DATA_DIR+filename)
ocr(__DATA_DIR+filename,url,True)