Skip to content
Extraits de code Groupes Projets
Valider 6f58245b rédigé par Bastien Le Querrec's avatar Bastien Le Querrec
Parcourir les fichiers

bascule du code dans des classes

parent a0d527a0
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
__pycache__/
bin/
lib/
data/
......
import os, re
import subprocess
import logging
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
logger = logging.getLogger(__name__)
class RAAspotter:
class RAA:
url = ""
date = ""
name = ""
filename = ""
def __init__(self, url, date, name, filename):
if not url == "":
self.url = url
if not date == "":
self.date = date
if not name == "":
self.name = name
if not filename == "":
self.filename = filename
def __init__(self, data_dir, user_agent=""):
logger.debug('Initialisation de RAAspotter')
self.user_agent = user_agent
self.session = requests.Session()
self.data_dir = data_dir
# On démarre le navigateur
def get_session(self, url, wait_element=""):
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
if not self.user_agent == "":
webdriver_options.add_argument(f"--user-agent={self.user_agent}")
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement de l'URL
browser.get(url)
if not wait_element == "":
# On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element)))
page_content = browser.page_source
# On récupère les cookies du navigateur pour les réutiliser plus tard
for cookie in browser.get_cookies():
self.session.cookies.set(cookie['name'], cookie['value'])
self.session.headers.update({'User-Agent': self.user_agent})
# On arrête le navigateur
browser.quit()
display.stop()
return page_content
def print_output(data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a')
f.write(data+"\n")
f.close()
def download_file(self, raa):
try:
os.makedirs(os.path.dirname(self.data_dir+raa.filename), exist_ok=True)
file = self.session.get(raa.url)
f = open(self.data_dir+raa.filename,'wb')
f.write(file.content);
f.close()
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...')
self.download_file(raa)
except Exception as exc:
logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}')
def get_txt_file(filename):
return re.sub('(\.pdf)$', '.txt', filename)
def parse_pdf(self, raa, keywords):
if not os.path.isfile(self.data_dir+raa.filename):
logger.warning(f'ATTENTION: le fichier {raa.filename} n\'existe pas')
else:
text = extract_text(self.data_dir+raa.filename)
found = False
for keyword in keywords:
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
RAAspotter.print_output(f'\033[92m{raa.name}\033[0m ({raa.date})')
found = True
RAAspotter.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
# Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
f = open(RAAspotter.get_txt_file(self.data_dir+raa.filename),'w')
f.write(text)
f.close()
os.remove(self.data_dir+raa.filename)
if found:
RAAspotter.print_output('')
def ocr(self, raa, retry_on_failure=True):
cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , self.data_dir+raa.filename, self.data_dir+raa.filename]
logger.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
self.download_file(raa)
self.ocr(raa,False)
elif (not exc.returncode == 6) and (not exc.returncode == 10):
logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
def parse_raa(self, elements, keywords):
for raa in elements:
# Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse
if not os.path.isfile(RAAspotter.get_txt_file(self.data_dir+raa.filename)):
logger.info(f'Nouveau fichier : {raa.name} ({raa.date}). URL : {raa.url}')
self.download_file(raa)
self.ocr(raa, True)
self.parse_pdf(raa, keywords)
def get_raa(self, page_content):
logger.error('Cette fonction doit être surchargée')
from bs4 import BeautifulSoup
from urllib.parse import unquote
from RAAspotter import RAAspotter
class RAAspotter_ppparis(RAAspotter):
def get_raa(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href']
else:
url = a['href']
name = a.find('span').get_text()
date = a.find('div', class_="field--type-datetime").get_text()
filename = unquote(url.split('/')[-1])
raa = RAAspotter.RAA(url, date, name, filename)
elements.append(raa)
return elements
\ No newline at end of file
#!/usr/bin/env python3
import os, sys, time, re
import subprocess
from bs4 import BeautifulSoup
import os
import argparse
from urllib.parse import unquote
import logging
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
from RAAspotter import RAAspotter
from RAAspotter_ppparis import RAAspotter_ppparis
# Config
__RAA_PAGE = 'https://www.prefecturedepolice.interieur.gouv.fr/actualites-et-presse/arretes/accueil-arretes'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
__headless_mode = True
__LIST = os.getenv('LIST') or 'vidéoprotection,caméras,captation,aéronef'
__KEYWORDS = os.getenv('KEYWORDS') or 'vidéoprotection,caméras,captation,aéronef'
__DATA_DIR = os.path.dirname(os.path.abspath(__file__))+'/data/ppparis/'
# Fonctions
def print_output(data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a')
f.write(data+"\n")
f.close()
# On charge l'URL avec la liste des fichiers
def get_html(url):
browser.get(url)
# On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
element = WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, "block-decree-list-block")))
return browser.page_source
def download_file(url, dest):
try:
os.makedirs(os.path.dirname(dest), exist_ok=True)
file = session.get(url)
f = open(dest,'wb')
f.write(file.content);
f.close()
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
logging.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {url}, nouvelle tentative...')
download_file(url, dest)
except Exception as exc:
logging.warning(f'ATTENTION: Impossible de télécharger le fichier {url}: {exc}')
def get_txt_file(filename):
return re.sub('(\.pdf)$', '.txt', filename)
def parse_pdf(filename, name, date):
if not os.path.isfile(__DATA_DIR+filename):
logging.warning(f'ATTENTION: le fichier {filename} n\'existe pas')
else:
text = extract_text(__DATA_DIR+filename)
found = False
for keyword in __LIST.split(','):
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
print_output(f'\033[92m{name}\033[0m ({date})')
found = True
print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
# Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
f = open(get_txt_file(__DATA_DIR+filename),'w')
f.write(text)
f.close()
os.remove(__DATA_DIR+filename)
if found:
print_output('')
def ocr(file,url,retry_on_failure=True):
cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', '--skip-big', '500' , __DATA_DIR+filename, __DATA_DIR+filename]
logging.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logging.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
download_file(url,file)
ocr(file,url,False)
elif not exc.returncode == 6:
logging.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
# Début du script
parser = argparse.ArgumentParser(prog='ppparis.py', description='Télécharge les RAA de la Préfecture de police de Paris et recherche des mots-clés')
parser.add_argument('-n', '--no-headless', action='store_true', help='ne lance pas le navigateur en mode headless (pratique pour débugguer ou en dehors d\'une CI)')
parser.add_argument('-l', '--list', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)')
parser.add_argument('-k', '--keywords', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)')
parser.add_argument('-v', action='store_true', help='relève le niveau de verbosité à INFO')
parser.add_argument('-vv', action='store_true', help='relève le niveau de verbosité à DEBUG')
args = parser.parse_args()
......@@ -107,74 +25,19 @@ if args.v or os.getenv('VERBOSE'):
if args.vv or os.getenv('VVERBOSE'):
logging.basicConfig(level=logging.DEBUG)
if args.no_headless:
__headless_mode = False
if not __headless_mode:
logging.debug('Mode no-headless')
if args.list:
__LIST = args.list
logging.info(f'Termes recherchés: {__LIST}')
if args.keywords:
__KEYWORDS = args.keywords
# On crée le dossier de téléchargement
os.makedirs(__DATA_DIR, exist_ok=True)
# On démarre le navigateur
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
webdriver_options.add_argument(f"--user-agent={__USER_AGENT}")
if __headless_mode:
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
else:
webdriver_options.add_argument("--start-maximized")
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement des RAA
page_content = get_html(__RAA_PAGE)
# On récupère les cookies du navigateur pour les réutiliser lors du téléchargement des PDF
session = requests.Session()
for cookie in browser.get_cookies():
session.cookies.set(cookie['name'], cookie['value'])
session.headers.update({'User-Agent': __USER_AGENT})
# On arrête le navigateur
browser.quit()
if __headless_mode:
display.stop()
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href']
else:
url = a['href']
name = a.find('span').get_text()
date = a.find('div', class_="field--type-datetime").get_text()
raa_spotter = RAAspotter_ppparis(__DATA_DIR, __USER_AGENT)
filename = unquote(url.split('/')[-1])
RAAspotter_ppparis.print_output('RAAspotter_ppparis')
RAAspotter_ppparis.print_output(f'Termes recherchés: {__KEYWORDS}')
# Si le fichier n'a pas déjà été parsé, on le télécharge et on le parse
if not os.path.isfile(get_txt_file(__DATA_DIR+filename)):
logging.info(f'Nouveau fichier : {name} ({date}). URL : {url}')
download_file(url, __DATA_DIR+filename)
page_content = raa_spotter.get_session(__RAA_PAGE, "block-decree-list-block")
ocr(__DATA_DIR+filename,url,True)
raa_elements = raa_spotter.get_raa(page_content)
parse_pdf(filename, name, date)
raa_spotter.parse_raa(raa_elements, __KEYWORDS.split(','))
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter