Skip to content
Extraits de code Groupes Projets
ppparis.py 6,63 ko
Newer Older
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
#!/usr/bin/env python3

import os, sys, time, re
import subprocess
from bs4 import BeautifulSoup
import argparse
from urllib.parse import unquote
import logging
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
from pyvirtualdisplay import Display

from pdfminer.high_level import extract_text

# Config
__RAA_PAGE = 'https://www.prefecturedepolice.interieur.gouv.fr/actualites-et-presse/arretes/accueil-arretes'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
__headless_mode = True
__LIST = os.getenv('LIST') or 'vidéoprotection,caméras,captation,aéronef'
__DATA_DIR = os.path.dirname(os.path.abspath(__file__))+'/data/ppparis/'

# Fonctions
def print_output(data):
  print(data)
  data = data.replace('\033[92m', '')
  data = data.replace('\033[0m', '')
  data = data.replace('\033[1m', '')
  f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a')
  f.write(data+"\n")
  f.close()

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
def get_html(url):
  browser.get(url)

  # On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
  element = WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, "block-decree-list-block")))
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

  return browser.page_source

def download_file(url, dest):
  try:
    os.makedirs(os.path.dirname(dest), exist_ok=True)
    file = session.get(url)
    f = open(dest,'wb')
    f.write(file.content);
    f.close()
  except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
    logging.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {url}, nouvelle tentative...')
    download_file(url, dest)
  except Exception as exc:
    logging.warning(f'ATTENTION: Impossible de télécharger le fichier {url}: {exc}')
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

def get_txt_file(filename):
  return re.sub('(\.pdf)$', '.txt', filename)

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
def parse_pdf(filename, name, date):
  if not os.path.isfile(__DATA_DIR+filename):
    logging.warning(f'ATTENTION: le fichier {filename} n\'existe pas')
  else:
    text = extract_text(__DATA_DIR+filename)
    found = False
    for keyword in __LIST.split(','):
      if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
        if not found:
          print_output(f'\033[92m{name}\033[0m ({date})')
          found = True
        print_output(f'    Le terme \033[1m{keyword}\033[0m a été trouvé.')

    # Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
    f = open(get_txt_file(__DATA_DIR+filename),'w')
    f.write(text)
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
    f.close()
    os.remove(__DATA_DIR+filename)
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
    if found:
      print_output('')

def ocr(file,url,retry_on_failure=True):
  cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , __DATA_DIR+filename, __DATA_DIR+filename]
  logging.debug(f'Lancement de ocrmypdf: {cmd}')
  try:
    output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
  except subprocess.CalledProcessError as exc:
    if exc.returncode == 2 and retry_on_failure:
      logging.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
      download_file(url,file)
      ocr(file,url,False)
    elif not exc.returncode == 6:
      logging.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
# Début du script
parser = argparse.ArgumentParser(prog='ppparis.py', description='Télécharge les RAA de la Préfecture de police de Paris et recherche des mots-clés')
parser.add_argument('-n', '--no-headless', action='store_true', help='ne lance pas le navigateur en mode headless (pratique pour débugguer ou en dehors d\'une CI)')
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
parser.add_argument('-l', '--list', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)')
parser.add_argument('-v', action='store_true', help='relève le niveau de verbosité à INFO')
parser.add_argument('-vv', action='store_true', help='relève le niveau de verbosité à DEBUG')
args = parser.parse_args()

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
  logging.basicConfig(level=logging.INFO)

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
  logging.basicConfig(level=logging.DEBUG)

if args.no_headless:
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
  __headless_mode = False

if not __headless_mode:
  logging.debug('Mode no-headless')
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

if args.list:
  __LIST = args.list

logging.info(f'Termes recherchés: {__LIST}')
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

# On crée le dossier de téléchargement
os.makedirs(__DATA_DIR, exist_ok=True)

# On démarre le navigateur
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
webdriver_options.add_argument(f"--user-agent={__USER_AGENT}")
if __headless_mode:
  webdriver_options.add_argument("--headless")
  webdriver_options.add_argument("--window-size=1024,768")
  display = Display(visible=False, size=(1024, 768))
  display.start()
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
else:
  webdriver_options.add_argument("--start-maximized")

browser = webdriver.Chrome(options=webdriver_options)

# Téléchargement des RAA
page_content = get_html(__RAA_PAGE)

# On récupère les cookies du navigateur pour les réutiliser lors du téléchargement des PDF
session = requests.Session()
for cookie in browser.get_cookies():
  session.cookies.set(cookie['name'], cookie['value'])
session.headers.update({'User-Agent': __USER_AGENT})

# On arrête le navigateur
browser.quit()
if __headless_mode:
  display.stop()

Bastien Le Querrec's avatar
Bastien Le Querrec a validé
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')

# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.find_all('a', href=True):
  if a['href'].endswith('.pdf'):
    if a['href'].startswith('/'):
      url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href']
    else:
      url = a['href']

    name = a.find('span').get_text()
    date = a.find('div', class_="field--type-datetime").get_text()

    filename = unquote(url.split('/')[-1])

    # Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse
    if not os.path.isfile(get_txt_file(__DATA_DIR+filename)):
Bastien Le Querrec's avatar
Bastien Le Querrec a validé
      logging.info(f'Nouveau fichier : {name} ({date}). URL : {url}')
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

      ocr(__DATA_DIR+filename,url,True)
Bastien Le Querrec's avatar
Bastien Le Querrec a validé

      parse_pdf(filename, name, date)