Skip to content
Extraits de code Groupes Projets

Comparer les révisions

Les modifications sont affichées comme si la révision source était fusionnée avec la révision cible. En savoir plus sur la comparaison des révisions.

Source

Sélectionner le projet cible
No results found

Cible

Sélectionner le projet cible
  • la-quadrature-du-net/Attrap
  • foggyfrog/Attrap
  • skhwiz/Attrap
  • precambrien/Attrap
  • ketsapiwiq/Attrap
  • Joseki/Attrap
  • kr1p/attrap-pref-12
  • kr1p/attrap-pref-46
  • kr1p/attrap-pi
  • Guinness/Attrap
  • astroidgritty/attrap-pref-84
  • davinov/Attrap
  • maettellite/attrap-pref-01
  • m242/Attrap
  • multi/Attrap
  • mverdeil/Attrap
  • olpo/Attrap
17 résultats
Afficher les modifications
Validations sur la source (178)
__pycache__/ __pycache__/
bin/ bin/
lib/ lib/
lib64
data/ data/
pyvenv.cfg pyvenv.cfg
output_*.log output_*.log
*.patch
CACHEDIR.TAG
...@@ -74,10 +74,11 @@ pep8: ...@@ -74,10 +74,11 @@ pep8:
- unprivileged - unprivileged
needs: [install] needs: [install]
script: script:
- s3/download-from-s3.sh "${PREF}" "${S3_KEY}" "${S3_SECRET}" "${S3_HOST}" "${S3_BUCKET}" || true - misc/download-from-s3.sh "${PREF}" "${S3_KEY}" "${S3_SECRET}" "${S3_HOST}" "${S3_BUCKET}" data/ || true
- source bin/activate
- /etc/init.d/tor start - /etc/init.d/tor start
- make "${PREF}" - make "${PREF}"
- s3/upload-to-s3.sh "${PREF}" "${S3_KEY}" "${S3_SECRET}" "${S3_HOST}" "${S3_BUCKET}" || true - misc/upload-to-s3.sh "${PREF}" "${S3_KEY}" "${S3_SECRET}" "${S3_HOST}" "${S3_BUCKET}" data/ || true
cache: cache:
key: $CI_COMMIT_REF_SLUG key: $CI_COMMIT_REF_SLUG
paths: paths:
...@@ -98,6 +99,21 @@ test_ppparis: ...@@ -98,6 +99,21 @@ test_ppparis:
PREF: "ppparis" PREF: "ppparis"
extends: .default_pref extends: .default_pref
test_pref01:
variables:
PREF: "pref01"
extends: .default_pref
test_pref02:
variables:
PREF: "pref02"
extends: .default_pref
test_pref03:
variables:
PREF: "pref03"
extends: .default_pref
test_pref04: test_pref04:
variables: variables:
PREF: "pref04" PREF: "pref04"
...@@ -118,11 +134,46 @@ test_pref09: ...@@ -118,11 +134,46 @@ test_pref09:
PREF: "pref09" PREF: "pref09"
extends: .default_pref extends: .default_pref
test_pref10:
variables:
PREF: "pref10"
extends: .default_pref
test_pref11:
variables:
PREF: "pref11"
extends: .default_pref
test_pref13: test_pref13:
variables: variables:
PREF: "pref13" PREF: "pref13"
extends: .default_pref extends: .default_pref
test_pref2a:
variables:
PREF: "pref2a"
extends: .default_pref
test_pref2b:
variables:
PREF: "pref2b"
extends: .default_pref
test_pref25:
variables:
PREF: "pref25"
extends: .default_pref
test_pref29:
variables:
PREF: "pref29"
extends: .default_pref
test_pref30:
variables:
PREF: "pref30"
extends: .default_pref
test_pref31: test_pref31:
variables: variables:
PREF: "pref31" PREF: "pref31"
...@@ -148,6 +199,11 @@ test_pref38: ...@@ -148,6 +199,11 @@ test_pref38:
PREF: "pref38" PREF: "pref38"
extends: .default_pref extends: .default_pref
test_pref39:
variables:
PREF: "pref39"
extends: .default_pref
test_pref42: test_pref42:
variables: variables:
PREF: "pref42" PREF: "pref42"
...@@ -158,11 +214,41 @@ test_pref44: ...@@ -158,11 +214,41 @@ test_pref44:
PREF: "pref44" PREF: "pref44"
extends: .default_pref extends: .default_pref
test_pref49:
variables:
PREF: "pref49"
extends: .default_pref
test_pref50:
variables:
PREF: "pref50"
extends: .default_pref
test_pref52:
variables:
PREF: "pref52"
extends: .default_pref
test_pref54:
variables:
PREF: "pref54"
extends: .default_pref
test_pref55:
variables:
PREF: "pref55"
extends: .default_pref
test_pref59: test_pref59:
variables: variables:
PREF: "pref59" PREF: "pref59"
extends: .default_pref extends: .default_pref
test_pref61:
variables:
PREF: "pref61"
extends: .default_pref
test_pref62: test_pref62:
variables: variables:
PREF: "pref62" PREF: "pref62"
...@@ -193,6 +279,26 @@ test_pref69: ...@@ -193,6 +279,26 @@ test_pref69:
PREF: "pref69" PREF: "pref69"
extends: .default_pref extends: .default_pref
test_pref73:
variables:
PREF: "pref73"
extends: .default_pref
test_pref75:
variables:
PREF: "pref75"
extends: .default_pref
test_pref76:
variables:
PREF: "pref76"
extends: .default_pref
test_pref77:
variables:
PREF: "pref77"
extends: .default_pref
test_pref80: test_pref80:
variables: variables:
PREF: "pref80" PREF: "pref80"
...@@ -213,7 +319,42 @@ test_pref87: ...@@ -213,7 +319,42 @@ test_pref87:
PREF: "pref87" PREF: "pref87"
extends: .default_pref extends: .default_pref
test_pref91:
variables:
PREF: "pref91"
extends: .default_pref
test_pref92:
variables:
PREF: "pref92"
extends: .default_pref
test_pref93:
variables:
PREF: "pref93"
extends: .default_pref
test_pref94:
variables:
PREF: "pref94"
extends: .default_pref
test_pref976: test_pref976:
variables: variables:
PREF: "pref976" PREF: "pref976"
extends: .default_pref extends: .default_pref
test_prefbretagne:
variables:
PREF: "prefbretagne"
extends: .default_pref
test_prefidf:
variables:
PREF: "prefidf"
extends: .default_pref
test_prefpaca:
variables:
PREF: "prefpaca"
extends: .default_pref
import os import os
import sys
import re import re
import random
import ssl import ssl
import subprocess import subprocess
import shutil import shutil
import string
import logging import logging
import requests import requests
import time import time
from types import SimpleNamespace
import datetime import datetime
import json import json
from urllib.parse import quote from urllib.parse import quote
...@@ -16,7 +20,9 @@ from selenium.webdriver.common.by import By ...@@ -16,7 +20,9 @@ from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions from selenium.webdriver.support import expected_conditions
import pytz
import dateparser import dateparser
import urllib3
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from pyvirtualdisplay import Display from pyvirtualdisplay import Display
...@@ -24,9 +30,8 @@ from pyvirtualdisplay import Display ...@@ -24,9 +30,8 @@ from pyvirtualdisplay import Display
from pypdf import PdfReader from pypdf import PdfReader
from pypdf import PdfWriter from pypdf import PdfWriter
from pypdf.generic import NameObject, NumberObject from pypdf.generic import NameObject, NumberObject
from pypdf.errors import PdfStreamError
from stem import Signal from pypdf.errors import EmptyFileError
from stem.control import Controller
import hashlib import hashlib
import smtplib import smtplib
...@@ -41,29 +46,32 @@ logger = logging.getLogger(__name__) ...@@ -41,29 +46,32 @@ logger = logging.getLogger(__name__)
class Attrap: class Attrap:
class RAA: class RAA:
"""La classe représentant un Recueil des actes administratifs. La plupart du temps, il s'agit d'un PDF avec plusieurs arrêtés."""
url = "" url = ""
date = datetime.datetime(1970, 1, 1) date = None
date_str = ""
name = "" name = ""
sha256 = "" sha256 = ""
pdf_creation_date = None pdf_creation_date = None
pdf_modification_date = None pdf_modification_date = None
def __init__(self, url, date, name): def __init__(self, url, date, name, timezone='Europe/Paris'):
self.timezone = timezone
if not url == "": if not url == "":
self.url = url self.url = url
if not date == "": if date is not None:
self.date = date self.date = Attrap.get_aware_datetime(date, timezone=timezone)
self.date_str = date.strftime("%d/%m/%Y")
if not name == "": if not name == "":
self.name = name self.name = name
def get_sha256(self): def get_sha256(self):
"""Calcule et met en cache le hash sha256 de l'URL du PDF, pour servir d'identifiant unique."""
if (self.sha256 == ""): if (self.sha256 == ""):
self.sha256 = hashlib.sha256(self.url.encode('utf-8')).hexdigest() self.sha256 = hashlib.sha256(self.url.encode('utf-8')).hexdigest()
return self.sha256 return self.sha256
def get_pdf_dates(self, data_dir): def get_pdf_dates(self, data_dir):
"""Extrait les dates des PDF pour les ajouter à l'objet du RAA."""
raa_data_dir = f'{data_dir}/raa/' raa_data_dir = f'{data_dir}/raa/'
reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.pdf') reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.pdf')
...@@ -71,12 +79,17 @@ class Attrap: ...@@ -71,12 +79,17 @@ class Attrap:
if pdf_metadata: if pdf_metadata:
if pdf_metadata.creation_date: if pdf_metadata.creation_date:
self.pdf_creation_date = pdf_metadata.creation_date self.pdf_creation_date = Attrap.get_aware_datetime(pdf_metadata.creation_date, timezone=self.timezone)
if self.date is None:
self.date = Attrap.get_aware_datetime(pdf_metadata.creation_date, timezone=self.timezone)
if pdf_metadata.modification_date: if pdf_metadata.modification_date:
self.pdf_modification_date = pdf_metadata.modification_date self.pdf_modification_date = Attrap.get_aware_datetime(pdf_metadata.modification_date, timezone=self.timezone)
if self.date is None:
self.date = Attrap.get_aware_datetime(pdf_metadata.modification_date, timezone=self.timezone)
def extract_content(self, data_dir): def extract_content(self, data_dir):
"""Extrait le contenu du PDF OCRisé pour l'écrire dans le fichier qui servira à faire la recherche de mots-clés. Supprime tous les PDF à la fin."""
raa_data_dir = f'{data_dir}/raa/' raa_data_dir = f'{data_dir}/raa/'
text = "" text = ""
...@@ -100,33 +113,42 @@ class Attrap: ...@@ -100,33 +113,42 @@ class Attrap:
os.remove(f'{raa_data_dir}{self.get_sha256()}.flat.pdf') os.remove(f'{raa_data_dir}{self.get_sha256()}.flat.pdf')
def write_properties(self, data_dir): def write_properties(self, data_dir):
"""Écris les propriétés du RAA dans un fichier JSON."""
raa_data_dir = f'{data_dir}/raa/' raa_data_dir = f'{data_dir}/raa/'
pdf_creation_date_json = None pdf_creation_date_json = None
pdf_modification_date_json = None pdf_modification_date_json = None
if self.pdf_creation_date: if self.pdf_creation_date:
pdf_creation_date_json = self.pdf_creation_date.strftime("%d/%m/%Y %H:%M:%S") pdf_creation_date_json = self.pdf_creation_date.astimezone(pytz.utc).isoformat(timespec="seconds")
if self.pdf_modification_date: if self.pdf_modification_date:
pdf_modification_date_json = self.pdf_modification_date.strftime("%d/%m/%Y %H:%M:%S") pdf_modification_date_json = self.pdf_modification_date.astimezone(pytz.utc).isoformat(timespec="seconds")
properties = { properties = {
'version': 2,
'name': self.name, 'name': self.name,
'date': self.date_str, 'date': self.date.strftime("%Y-%m-%d"),
'url': quote(self.url, safe='/:'), 'url': quote(self.url, safe='/:'),
'first_saw_on': datetime.datetime.today().strftime("%d/%m/%Y %H:%M:%S"), 'first_seen_on': datetime.datetime.now(pytz.utc).isoformat(timespec="seconds"),
'pdf_creation_date': pdf_creation_date_json, 'pdf_creation_date': pdf_creation_date_json,
'pdf_modification_date': pdf_modification_date_json 'pdf_modification_date': pdf_modification_date_json,
'timezone': self.timezone
} }
f = open(f'{raa_data_dir}{self.get_sha256()}.json', 'w') f = open(f'{raa_data_dir}{self.get_sha256()}.json', 'w')
f.write(json.dumps(properties)) f.write(json.dumps(properties))
f.close() f.close()
def parse_metadata(self, data_dir): def parse_metadata(self, data_dir):
"""Lance l'extraction des dates du PDF puis l'écriture de ses propriétés dans un fichier JSON."""
self.get_pdf_dates(data_dir) self.get_pdf_dates(data_dir)
self.write_properties(data_dir) self.write_properties(data_dir)
def __init__(self, data_dir, user_agent=''): def __init__(self, data_dir, user_agent=''):
"""
Initialise Attrap et le dossier de données.
data_dir -- le dossier où sont situées les données
user_agent -- le user_agent utilisé pour les requêtes
"""
logger.debug('Initialisation de Attrap') logger.debug('Initialisation de Attrap')
# On crée le dossier de téléchargement # On crée le dossier de téléchargement
...@@ -137,14 +159,18 @@ class Attrap: ...@@ -137,14 +159,18 @@ class Attrap:
self.found = False self.found = False
self.output_file_path = os.path.dirname(os.path.abspath(__file__)) + f'/output_{self.short_code}.log' self.output_file_path = os.path.dirname(os.path.abspath(__file__)) + f'/output_{self.short_code}.log'
self.sleep_time = 0 self.sleep_time = 0
self.last_http_request = 0
self.tor_enabled = False self.tor_enabled = False
self.tor_max_requests = 0 self.tor_max_requests = 0
self.tor_requests = 0 self.tor_requests = 0
self.tor_socks5_key = None
self.not_before = datetime.datetime(2024, 1, 1) self.not_before = datetime.datetime(2024, 1, 1)
self.smtp_configured = False self.smtp_configured = False
self.mastodon = None self.mastodon = None
self.mastodon_prefix = '' self.mastodon_prefix = ''
self.mastodon_suffix = '' self.mastodon_suffix = ''
self.safe_mode = False
self.timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzname()
self.update_user_agent(user_agent) self.update_user_agent(user_agent)
...@@ -154,7 +180,14 @@ class Attrap: ...@@ -154,7 +180,14 @@ class Attrap:
self.print_output(str(self.__class__.__name__)) self.print_output(str(self.__class__.__name__))
# Si le safe mode est activé, on configure un long délai entre chaque requête
if os.getenv('SAFE_MODE'):
self.safe_mode = True
logger.warning('ATTENTION: le safe mode est activé, configuration d\'un délai entre chaque requête')
self.sleep_time = 30
def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix): def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix):
"""Configuration de Mastodon afin de publier un toot à chaque RAA détecté."""
if access_token and access_token != "" and instance and instance != "": if access_token and access_token != "" and instance and instance != "":
self.mastodon = Mastodon( self.mastodon = Mastodon(
access_token=access_token, access_token=access_token,
...@@ -164,6 +197,7 @@ class Attrap: ...@@ -164,6 +197,7 @@ class Attrap:
self.mastodon_suffix = mastodon_suffix self.mastodon_suffix = mastodon_suffix
def mastodon_toot(self, content): def mastodon_toot(self, content):
"""Publie le toot en ajoutant le header et footer configurés."""
if self.mastodon: if self.mastodon:
toot = content toot = content
if not self.mastodon_prefix == '': if not self.mastodon_prefix == '':
...@@ -173,17 +207,16 @@ class Attrap: ...@@ -173,17 +207,16 @@ class Attrap:
self.mastodon.toot(toot) self.mastodon.toot(toot)
def enable_tor(self, max_requests=0): def enable_tor(self, max_requests=0):
proxies = { """Active l'utilisation de Tor pour effectuer les requêtes."""
"http": f"socks5h://127.0.0.1:9050", if not self.safe_mode:
"https": f"socks5h://127.0.0.1:9050", self.tor_enabled = True
} self.tor_max_requests = max_requests
self.tor_enabled = True self.tor_get_new_id()
self.tor_max_requests = max_requests else:
self.tor_requests = 0 logger.warning('ATTENTION: le safe mode est activé, Tor n\'a pas été activé')
self.session.proxies.update(proxies)
self.tor_get_new_id()
def disable_tor(self): def disable_tor(self):
"""Désactive l'utilisation de Tor."""
proxies = {} proxies = {}
self.tor_enabled = False self.tor_enabled = False
self.tor_max_requests = 0 self.tor_max_requests = 0
...@@ -191,19 +224,26 @@ class Attrap: ...@@ -191,19 +224,26 @@ class Attrap:
self.session.proxies.update(proxies) self.session.proxies.update(proxies)
def tor_get_new_id(self): def tor_get_new_id(self):
"""Change de circuit Tor. Cela permet de changer de noeud de sortie donc d'IP."""
if self.tor_enabled: if self.tor_enabled:
logger.info('Changement d\'identité Tor') self.tor_socks5_key = 'attrap_' + ''.join(random.choices(string.ascii_lowercase, k=20))
try: proxies = {
self.session.close() "http": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050",
controller = Controller.from_port(port=9051) "https": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050",
controller.authenticate() }
controller.signal(Signal.NEWNYM) self.session.proxies.update(proxies)
time.sleep(5) self.tor_requests = 0
self.tor_requests = 0
except Exception as exc: def get_sub_pages(self, page_content, element, host, recursive_until_pdf, selenium=False):
logger.debug(f'Impossible de changer d\'identité Tor: {exc}') """
Récupère, à partir d'un chemin CSS, les sous-pages d'une page.
def get_sub_pages(self, page_content, element, host, recursive_until_pdf):
page_content -- Un contenu HTML à analyser
element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée
host -- Le nom d'hôte du site
recursive_until_pdf -- Un booléen pour savoir s'il faut rechercher un fichier PDF dans le chemin CSS. Le cas échéant, relance la recherche sur la sous-page si le lien n'est pas un PDF.
selenium -- lance un navigateur avec Selenium pour contourner les protections anti-robots
"""
soup = BeautifulSoup(page_content, 'html.parser') soup = BeautifulSoup(page_content, 'html.parser')
sub_pages = [] sub_pages = []
for a in soup.select(element): for a in soup.select(element):
...@@ -219,7 +259,8 @@ class Attrap: ...@@ -219,7 +259,8 @@ class Attrap:
sub_page_content, sub_page_content,
element, element,
host, host,
recursive_until_pdf recursive_until_pdf,
selenium=selenium
): ):
sub_pages.append(sub_sub_page) sub_pages.append(sub_sub_page)
else: else:
...@@ -236,9 +277,24 @@ class Attrap: ...@@ -236,9 +277,24 @@ class Attrap:
sub_pages.append(sub_page) sub_pages.append(sub_page)
return sub_pages return sub_pages
def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, details_element, host): def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, details_element, host, selenium=False):
"""
Récupère, à partir d'un chemin CSS, les sous-pages d'une page contenant un pager.
page -- L'URL ou le contenu HTML de la page à analyser
sub_page_element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée
pager_element -- Le chemin CSS vers le lien de page suivante du pager
details_element -- Le chemin CSS vers l'objet contenant les détails de la sous-page recherchée
host -- Le nom d'hôte du site
selenium -- lance un navigateur avec Selenium pour contourner les protections anti-robots
"""
pages = [] pages = []
page_content = self.get_page(page, 'get').content if isinstance(page, bytes):
page = page.decode('utf-8')
if page.startswith('https://') or page.startswith('http://'):
page_content = self.get_page(page, 'get', selenium=selenium).content
else:
page_content = page
# On initialise le parser # On initialise le parser
soup = BeautifulSoup(page_content, 'html.parser') soup = BeautifulSoup(page_content, 'html.parser')
...@@ -269,43 +325,64 @@ class Attrap: ...@@ -269,43 +325,64 @@ class Attrap:
sub_page_element, sub_page_element,
pager_element, pager_element,
details_element, details_element,
host host,
selenium=selenium
): ):
pages.append(sub_page) pages.append(sub_page)
return pages return pages
def get_raa_with_pager(self, pages_list, pager_element, host): def get_raa_with_pager(self, pages_list, pager_element, host, filter_from_last_element_date=False, selenium=False):
"""
Récupère et analyse les RAA d'une page contenant un pager.
pages_list -- Un tableau contenant la liste des pages
pager_element -- Le chemin CSS vers le lien de page suivante du pager
host -- Le nom d'hôte du site
filter_from_last_element_date -- (Optionnel) Si la date du dernier élément de la dernière page parsée
n'est pas dans la plage temporelle voulue, ne charge pas les pages suivantes. Par défaut à False. Ne doit
être activé que si l'ordre des éléments est chronologique.
selenium -- lance un navigateur avec Selenium pour contourner les protections anti-robots
"""
elements = [] elements = []
# On parse chaque page passée en paramètre # On parse chaque page passée en paramètre
for page in pages_list: for page in pages_list:
page_content = self.get_page(page, 'get').content page_content = self.get_page(page, 'get', selenium=selenium).content
# Pour chaque page, on récupère les PDF # Pour chaque page, on récupère les PDF
for raa in self.get_raa_elements(page_content): for raa in self.get_raa_elements(page_content):
elements.append(raa) elements.append(raa)
# On regarde également s'il n'y aurait pas un pager # Si la date du dernier RAA est dans la plage temporelle voulue,
sub_pages = [] # on regarde également s'il n'y aurait pas un pager
for sub_page in self.get_sub_pages( if not filter_from_last_element_date or (filter_from_last_element_date and (elements[-1].date >= Attrap.get_aware_datetime(self.not_before, timezone=self.timezone))):
page_content, sub_pages = []
pager_element, for sub_page in self.get_sub_pages(
host, page_content,
True pager_element,
): host,
sub_pages.append(sub_page['url']) True
for sub_raa in self.get_raa_with_pager( ):
sub_pages, sub_pages.append(sub_page['url'])
pager_element, for sub_raa in self.get_raa_with_pager(
host sub_pages,
): pager_element,
elements.append(sub_raa) host,
filter_from_last_element_date=filter_from_last_element_date
):
elements.append(sub_raa)
return elements return elements
def set_sleep_time(self, sleep_time): def set_sleep_time(self, sleep_time):
"""Configure le temps de temporisation"""
self.sleep_time = sleep_time self.sleep_time = sleep_time
def has_pdf(self, page_content): def has_pdf(self, page_content):
"""
Renvoie un booléen Vrai si la page contient un lien vers un PDF
page_content -- Un contenu HTML à analyser
"""
elements = [] elements = []
soup = BeautifulSoup(page_content, 'html.parser') soup = BeautifulSoup(page_content, 'html.parser')
for a in soup.find_all('a', href=True): for a in soup.find_all('a', href=True):
...@@ -315,6 +392,13 @@ class Attrap: ...@@ -315,6 +392,13 @@ class Attrap:
# On démarre le navigateur # On démarre le navigateur
def get_session(self, url, wait_element, remaining_retries=0): def get_session(self, url, wait_element, remaining_retries=0):
"""
Lance un navigateur avec Selenium.
url -- URL à interroger
wait_element -- Élement (désigné par son identifiant CSS) qui indique que la page est chargée
remaining_retries -- Nombre d'échecs autorisé avant de soulever une erreur
"""
webdriver_options = webdriver.ChromeOptions() webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox") webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions") webdriver_options.add_argument("--disable-extensions")
...@@ -322,12 +406,17 @@ class Attrap: ...@@ -322,12 +406,17 @@ class Attrap:
webdriver_options.add_argument("--disable-dev-shm-usage") webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess") webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled") webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
webdriver_options.add_experimental_option("excludeSwitches", ["enable-automation"])
webdriver_options.add_experimental_option('useAutomationExtension', False)
if not self.user_agent == "": if not self.user_agent == "":
webdriver_options.add_argument(f"--user-agent={self.user_agent}") webdriver_options.add_argument(f"--user-agent={self.user_agent}")
webdriver_options.add_argument("--headless") if self.tor_enabled:
webdriver_options.add_argument("--window-size=1024,768") webdriver_options.add_argument(f'--proxy-server=socks5://127.0.0.1:9050')
webdriver_options.add_argument("--headless=new")
webdriver_options.add_argument("--start-maximized")
display = Display(visible=False, size=(1024, 768)) display = Display(visible=False, size=(1024, 768))
display.start() display.start()
...@@ -352,6 +441,8 @@ class Attrap: ...@@ -352,6 +441,8 @@ class Attrap:
logger.warning(f'TimeoutException: {exc}') logger.warning(f'TimeoutException: {exc}')
if remaining_retries > 0: if remaining_retries > 0:
time.sleep(5) time.sleep(5)
if self.tor_enabled:
self.tor_get_new_id()
return self.get_session(url, wait_element, (remaining_retries - 1)) return self.get_session(url, wait_element, (remaining_retries - 1))
else: else:
raise TimeoutException(exc) raise TimeoutException(exc)
...@@ -369,6 +460,7 @@ class Attrap: ...@@ -369,6 +460,7 @@ class Attrap:
return page_content return page_content
def print_output(self, data): def print_output(self, data):
"""Affiche dans le terminal et dans le fichier de log un texte"""
print(data) print(data)
data = data.replace('\033[92m', '') data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '') data = data.replace('\033[0m', '')
...@@ -377,22 +469,42 @@ class Attrap: ...@@ -377,22 +469,42 @@ class Attrap:
f.write(data + "\n") f.write(data + "\n")
f.close() f.close()
def get_page(self, url, method, data={}): def get_page(self, url, method, data={}, selenium=False):
"""
Récupère le contenu HTML d'une page web
url -- L'URL de la page demandée
method -- 'post' ou 'get', selon le type de requête
data -- Un dictionnaire contenant les données à envoyer au site
selenium -- lance un navigateur avec Selenium pour contourner les protections anti-robots
"""
try: try:
logger.debug(f'Chargement de la page {url}') logger.debug(f'Chargement de la page {url}')
# Si un délai a été configuré, on vérifie qu'il n'est pas trop tôt pour lancer la requête
if self.sleep_time > 0: if self.sleep_time > 0:
time.sleep(self.sleep_time) current_time = int(time.mktime(datetime.datetime.today().timetuple()))
remaining_sleep_time = self.last_http_request + self.sleep_time - current_time
if remaining_sleep_time > 0:
time.sleep(remaining_sleep_time)
self.last_http_request = int(time.mktime(datetime.datetime.today().timetuple()))
page = None page = None
if method == 'get': if selenium and method == 'get':
page = self.session.get(url, timeout=(10, 120)) page_content = self.get_session(url, None, 6)
if method == 'post': page = {'content': page_content, 'status_code': 200}
page = self.session.post(url, data=data, timeout=(10, 120)) page = SimpleNamespace(**page)
else:
if method == 'get':
page = self.session.get(url, timeout=(10, 120))
if method == 'post':
page = self.session.post(url, data=data, timeout=(10, 120))
if page.status_code == 429: if page.status_code == 429:
logger.warning('Erreur 429 Too Many Requests reçue, temporisation...') logger.warning('Erreur 429 Too Many Requests reçue, temporisation...')
self.tor_get_new_id() self.tor_get_new_id()
time.sleep(55) time.sleep(1)
return self.get_page(url, method, data) return self.get_page(url, method, data)
if self.tor_enabled: if self.tor_enabled:
...@@ -405,17 +517,22 @@ class Attrap: ...@@ -405,17 +517,22 @@ class Attrap:
except requests.exceptions.ConnectionError: except requests.exceptions.ConnectionError:
logger.warning(f'Erreur de connexion, temporisation...') logger.warning(f'Erreur de connexion, temporisation...')
self.tor_get_new_id() self.tor_get_new_id()
time.sleep(55) time.sleep(30)
return self.get_page(url, method, data) return self.get_page(url, method, data)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
logger.warning(f'Timeout, on relance la requête...') logger.warning(f'Timeout, on relance la requête...')
return self.get_page(url, method, data) return self.get_page(url, method, data)
except urllib3.exceptions.ProtocolError:
logger.warning(f'Erreur de connexion, on relance la requête...')
return self.get_page(url, method, data)
def update_user_agent(self, user_agent): def update_user_agent(self, user_agent):
"""Change la valeur du user-agent"""
self.user_agent = user_agent self.user_agent = user_agent
self.session.headers.update({'User-Agent': self.user_agent}) self.session.headers.update({'User-Agent': self.user_agent})
def download_file(self, raa): def download_file(self, raa):
"""Télécharge un RAA"""
try: try:
os.makedirs( os.makedirs(
os.path.dirname(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf'), os.path.dirname(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf'),
...@@ -433,8 +550,10 @@ class Attrap: ...@@ -433,8 +550,10 @@ class Attrap:
logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}') logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}')
def ocr(self, raa, retry_on_failure=True): def ocr(self, raa, retry_on_failure=True):
"""OCRise un RAA"""
cmd = [ cmd = [
'ocrmypdf', 'python3',
'bin/ocrmypdf',
'-l', 'eng+fra', '-l', 'eng+fra',
'--output-type', 'pdf', '--output-type', 'pdf',
'--redo-ocr', '--redo-ocr',
...@@ -460,7 +579,7 @@ class Attrap: ...@@ -460,7 +579,7 @@ class Attrap:
shutil.copy(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf') shutil.copy(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf')
def flatten_pdf(self, raa): def flatten_pdf(self, raa):
# OCRmyPDF ne sait pas gérer les formulaires, donc on les enlève avant OCRisation """Supprime les formulaires d'un PDF pour pouvoir les OCRiser après dans OCRmyPDF."""
reader = PdfReader(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf') reader = PdfReader(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf')
writer = PdfWriter() writer = PdfWriter()
...@@ -475,8 +594,10 @@ class Attrap: ...@@ -475,8 +594,10 @@ class Attrap:
writer.write(f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf') writer.write(f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf')
def search_keywords(self, raa, keywords): def search_keywords(self, raa, keywords):
"""Recherche des mots-clés dans le texte extrait du PDF"""
if keywords and not keywords == '': if keywords and not keywords == '':
text = open(f'{self.data_dir}/raa/{raa.get_sha256()}.txt').read() text = open(f'{self.data_dir}/raa/{raa.get_sha256()}.txt').read()
date_str = raa.date.strftime("%d/%m/%Y")
found = False found = False
found_keywords = [] found_keywords = []
...@@ -484,7 +605,7 @@ class Attrap: ...@@ -484,7 +605,7 @@ class Attrap:
if re.search(keyword, text, re.IGNORECASE | re.MULTILINE): if re.search(keyword, text, re.IGNORECASE | re.MULTILINE):
if not found: if not found:
url = quote(raa.url, safe='/:') url = quote(raa.url, safe='/:')
self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date_str})') self.print_output(f'\033[92m{raa.name}\033[0m ({date_str})')
self.print_output(f'URL : {url}') self.print_output(f'URL : {url}')
found = True found = True
self.found = True self.found = True
...@@ -498,26 +619,50 @@ class Attrap: ...@@ -498,26 +619,50 @@ class Attrap:
[str(x) for x in found_keywords] [str(x) for x in found_keywords]
) )
self.mastodon_toot( self.mastodon_toot(
f'{raa.name} ({raa.date_str})\n\nLes termes suivants ont ' f'{raa.name} ({date_str})\n\nLes termes suivants ont '
f'été trouvés : {found_keywords_str}.\n\nURL : {url}' f'été trouvés : {found_keywords_str}.\n\nURL : {url}'
) )
def parse_raa(self, elements, keywords): def parse_raa(self, elements, keywords):
"""
Démarre l'analyse des RAA.
elements -- Un tableau contenant les RAA à analyser
keywords -- Les mots-clés à rechercher dans chaque RAA
"""
self.print_output(f'Termes recherchés: {keywords}') self.print_output(f'Termes recherchés: {keywords}')
self.print_output('') self.print_output('')
for raa in elements: for raa in elements:
# Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la # Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la
# date maximale d'analyse, on le télécharge et on le parse # date maximale d'analyse, on le télécharge et on le parse
if raa.date >= self.not_before and not os.path.isfile(f'{self.data_dir}/raa/{raa.get_sha256()}.txt'): if not os.path.isfile(f'{self.data_dir}/raa/{raa.get_sha256()}.txt') and (not raa.date or (raa.date >= Attrap.get_aware_datetime(self.not_before, timezone=self.timezone))):
url = quote(raa.url, safe='/:') url = quote(raa.url, safe='/:')
logger.info(f'Nouveau fichier : {raa.name} ({raa.date_str}). URL : {url}')
self.download_file(raa) self.download_file(raa)
raa.parse_metadata(self.data_dir) try:
self.flatten_pdf(raa) raa.parse_metadata(self.data_dir)
self.ocr(raa, True) # Lorsque la date du RAA n'est pas connue, on a dû télécharger le PDF pour récupérer la date de ses métadonnées.
raa.extract_content(self.data_dir) # Donc on vérifie à nouveau ici si la date correspond à ce qu'on veut analyser
self.search_keywords(raa, keywords) if not raa.date:
os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf')
os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.json')
logger.error(f'ERREUR: le RAA {raa.name} n\'a pas de date !')
sys.exit(1)
if raa.date >= Attrap.get_aware_datetime(self.not_before, timezone=self.timezone):
date_str = raa.date.strftime("%d/%m/%Y")
logger.info(f'Nouveau fichier : {raa.name} ({date_str}). URL : {url}')
self.flatten_pdf(raa)
self.ocr(raa, True)
raa.extract_content(self.data_dir)
self.search_keywords(raa, keywords)
else:
# On supprime le fichier de metadonnées puisqu'on ne le parsera pas
os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf')
os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.json')
except PdfStreamError as exc:
logger.warning(f'ATTENTION: le RAA à l\'adresse {raa.url} n\'est pas valide ! On l\'ignore...')
except EmptyFileError as exc:
logger.warning(f'ATTENTION: le RAA à l\'adresse {raa.url} est vide ! On l\'ignore...')
def get_raa(self, page_content): def get_raa(self, page_content):
logger.error('Cette fonction doit être surchargée') logger.error('Cette fonction doit être surchargée')
...@@ -525,6 +670,19 @@ class Attrap: ...@@ -525,6 +670,19 @@ class Attrap:
def configure_mailer(self, smtp_host, smtp_username, smtp_password, def configure_mailer(self, smtp_host, smtp_username, smtp_password,
smtp_port, smtp_starttls, smtp_ssl, email_from, smtp_port, smtp_starttls, smtp_ssl, email_from,
email_to, email_object): email_to, email_object):
"""
Configure les courriers électroniques.
smtp_host -- Nom d'hôte du serveur SMTP
smtp_username -- Nom d'utilisateur pour se connecter au serveur SMTP
smtp_password -- Mot de passe pour se connecter au serveur SMTP
smtp_port -- Port de connexion au serveur SMTP
smtp_starttls -- Booléen. Si vrai, se connecte avec STARTTLS
smtp_ssl -- Booléen. Si vrai, se connecte avec SSL/TLS
email_from -- Adresse électronique de l'expéditeur des notifications
email_to -- Tableau contenant les adresses électroniques à notifier
email_object -- Objet du courrier électronique de notification
"""
self.smtp_host = smtp_host self.smtp_host = smtp_host
self.smtp_username = smtp_username self.smtp_username = smtp_username
self.smtp_password = smtp_password self.smtp_password = smtp_password
...@@ -585,12 +743,14 @@ class Attrap: ...@@ -585,12 +743,14 @@ class Attrap:
except Exception as exc: except Exception as exc:
logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}') logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')
# Fonction qui essaie de deviner la date d'un RAA à partir de son nom.
# Utile pour limiter les requêtes lors de l'obtention des RAA à scanner.
def guess_date(string, regex): def guess_date(string, regex):
"""
Essaie de deviner la date d'un RAA à partir de son nom.
Utile pour limiter les requêtes lors de l'obtention des RAA à scanner.
"""
try: try:
search = re.search(regex, string, re.IGNORECASE) search = re.search(regex, string, re.IGNORECASE)
guessed_date = dateparser.parse(search.group(1)) guessed_date = dateparser.parse(search.group(1), languages=['fr'], settings={'PREFER_DAY_OF_MONTH': 'last', 'PREFER_MONTH_OF_YEAR': 'last'})
if guessed_date is None: if guessed_date is None:
raise Exception('La date est un objet None') raise Exception('La date est un objet None')
else: else:
...@@ -598,3 +758,15 @@ class Attrap: ...@@ -598,3 +758,15 @@ class Attrap:
except Exception as exc: except Exception as exc:
logger.warning(f'Impossible de deviner la date du terme {string} : {exc}') logger.warning(f'Impossible de deviner la date du terme {string} : {exc}')
return datetime.datetime(9999, 1, 1) return datetime.datetime(9999, 1, 1)
def get_aware_datetime(unknown_datetime, timezone='Europe/Paris'):
"""
Retourne un objet datetime avisé.
datetime - L'objet datetime à aviser. Utilise le fuseau indiqué si datetime est naïf.
"""
if unknown_datetime.tzinfo is not None and unknown_datetime.tzinfo.utcoffset(unknown_datetime) is not None:
return unknown_datetime
else:
return pytz.timezone(timezone).localize(unknown_datetime)
...@@ -9,18 +9,19 @@ from Attrap import Attrap ...@@ -9,18 +9,19 @@ from Attrap import Attrap
class Attrap_ppparis(Attrap): class Attrap_ppparis(Attrap):
# Config # Config
__HOST = 'https://www.prefecturedepolice.interieur.gouv.fr' hostname = 'https://www.prefecturedepolice.interieur.gouv.fr'
__RAA_PAGE = f'{__HOST}/actualites-et-presse/arretes/accueil-arretes' raa_page = f'{hostname}/actualites-et-presse/arretes/accueil-arretes'
__WAIT_ELEMENT = 'block-decree-list-block' __WAIT_ELEMENT = 'block-decree-list-block'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36' user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
full_name = 'Préfecture de police de Paris' full_name = 'Préfecture de police de Paris'
short_code = 'ppparis' short_code = 'ppparis'
timezone = 'Europe/Paris'
def __init__(self, data_dir): def __init__(self, data_dir):
super().__init__(data_dir, self.__USER_AGENT) super().__init__(data_dir, self.user_agent)
def get_raa(self, keywords): def get_raa(self, keywords):
page_content = self.get_session(self.__RAA_PAGE, self.__WAIT_ELEMENT, 6) page_content = self.get_session(self.raa_page, self.__WAIT_ELEMENT, 6)
raa_elements = self.get_raa_elements(page_content) raa_elements = self.get_raa_elements(page_content)
self.parse_raa(raa_elements, keywords) self.parse_raa(raa_elements, keywords)
self.mailer() self.mailer()
...@@ -43,6 +44,6 @@ class Attrap_ppparis(Attrap): ...@@ -43,6 +44,6 @@ class Attrap_ppparis(Attrap):
name = a.find('span').get_text() name = a.find('span').get_text()
date = datetime.datetime.strptime(a.find('div', class_="field--type-datetime").get_text().strip(), '%d/%m/%Y') date = datetime.datetime.strptime(a.find('div', class_="field--type-datetime").get_text().strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name) raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa) elements.append(raa)
return elements return elements
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref01(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.ain.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs-RAA'
full_name = 'Préfecture de l\'Ain'
short_code = 'pref01'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '(?:Recueil|Recueils) (?:des actes administratifs)(?:[ -])*([0-9]{4})'
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref02(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.aisne.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-Actes-Administratifs'
full_name = 'Préfecture de l\'Aisne'
short_code = 'pref02'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = 'RAA [Aa]nnée ([0-9]{4})'
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref03(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.allier.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs-arretes'
full_name = 'Préfecture de l\'Allier'
short_code = 'pref03'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref04(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref04(Attrap): hostname = 'https://www.alpes-de-haute-provence.gouv.fr'
raa_page = f'{hostname}/Publications/Publications-administratives-et-legales/Recueil-des-Actes-Administratifs'
# Config
__HOST = 'https://www.alpes-de-haute-provence.gouv.fr'
__RAA_PAGE = f'{__HOST}/Publications/Publications-administratives-et-legales/Recueil-des-Actes-Administratifs'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture des Alpes-de-Haute-Provence' full_name = 'Préfecture des Alpes-de-Haute-Provence'
short_code = 'pref04' short_code = 'pref04'
timezone = 'Europe/Paris'
def __init__(self, data_dir): # Configuration des widgets à analyser
super().__init__(data_dir, self.__USER_AGENT) Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
self.enable_tor(10)
def get_raa(self, keywords):
elements = []
page_content = self.get_page(self.__RAA_PAGE, 'get').content
for sub_page in self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.__HOST,
False
):
if Attrap.guess_date(sub_page['name'], '([0-9]{4}).*').year >= self.not_before.year:
sub_page_content = self.get_page(sub_page['url'], 'get').content
for element in self.get_raa_elements(sub_page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le
# parse
for a in soup.select('a.fr-link.fr-link--download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref05(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref05(Attrap): hostname = 'https://www.hautes-alpes.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs'
# Config
__HOST = 'https://www.hautes-alpes.gouv.fr'
__RAA_PAGE = f'{__HOST}/Publications/Recueil-des-actes-administratifs'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture des Hautes-Alpes' full_name = 'Préfecture des Hautes-Alpes'
short_code = 'pref05' short_code = 'pref05'
timezone = 'Europe/Paris'
def __init__(self, data_dir): # Configuration des widgets à analyser
super().__init__(data_dir, self.__USER_AGENT) Attrap_prefdpt.grey_card['regex']['year'] = 'Année *([0-9]{4})'
self.enable_tor(10) Attrap_prefdpt.grey_card['regex']['month'] = '([A-Za-zéû]* *[0-9]{4})'
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_page(self.__RAA_PAGE, 'get').content
year_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.__HOST,
False
)
for year_page in year_pages:
if int(year_page['name'].replace('Année ', '').strip()) >= self.not_before.year:
year_pages_to_parse.append(year_page['url'])
month_pages_to_parse = []
# Pour chaque année, on cherche les sous-pages de mois
for year_page in year_pages_to_parse:
page_content = self.get_page(year_page, 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.__HOST,
False
)[::-1]
for month_page in month_pages:
# On filtre les mois ne correspondant pas à la période analysée
guessed_date = Attrap.guess_date(month_page['name'], '.*([a-zéû]* [0-9]{4})')
if guessed_date.replace(day=1) >= self.not_before.replace(day=1):
month_pages_to_parse.append(month_page['url'])
pages_to_parse = []
# Pour chaque page de mois, on cherche les pages de RAA
for month_page in month_pages_to_parse:
pages = self.get_sub_pages_with_pager(
month_page,
'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link.fr-mb-3w div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link',
'nav.fr-pagination ul.fr-pagination__list li a.fr-pagination__link.fr-pagination__link--next.fr-pagination__link--lg-label',
'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link.fr-mb-3w div.fr-card__body div.fr-card__content div.fr-card__end p.fr-card__detail',
self.__HOST
)[::-1]
for page in pages:
guessed_date = datetime.datetime.strptime(page['details'].replace('Publié le ', '').strip(), '%d/%m/%Y')
if guessed_date.replace(day=1) >= self.not_before.replace(day=1):
pages_to_parse.append(page['url'])
elements = []
# On parse les pages contenant des RAA
for page in pages_to_parse:
page_content = self.get_page(page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
# On parse les RAA
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('div.fr-grid-row div.fr-downloads-group.fr-downloads-group--bordered ul li a'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref06(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref06(Attrap): hostname = 'https://www.alpes-maritimes.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs-RAA'
# Config
__HOST = 'https://www.alpes-maritimes.gouv.fr'
__RAA_PAGE = {
'2024': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2024/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2024/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2024/Recueils-specifiques'
],
'2023': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2023/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2023/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2023/Recueils-specifiques'
],
'2022': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2022/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2022/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2022/Recueils-specifiques'
],
'2021': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2021/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2021/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2021/Recueils-specifiques'
],
'2020': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2020/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2020/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2020/Recueils-specifiques'
],
'2019': [
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2019/Recueils-mensuels',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2019/Recueils-speciaux',
f'{__HOST}/Publications/Recueil-des-actes-administratifs-RAA/Annee-2019/Recueils-specifiques'
]
}
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture des Alpes-Maritimes' full_name = 'Préfecture des Alpes-Maritimes'
short_code = 'pref06' short_code = 'pref06'
timezone = 'Europe/Paris'
def __init__(self, data_dir): # Configuration des widgets à analyser
super().__init__(data_dir, self.__USER_AGENT) Attrap_prefdpt.grey_card['regex']['year'] = 'Année *([0-9]{4})'
self.enable_tor(20)
def get_raa(self, keywords):
pages_to_parse = []
if self.not_before.year <= 2024:
for page in self.__RAA_PAGE['2024']:
pages_to_parse.append(page)
if self.not_before.year <= 2023:
for page in self.__RAA_PAGE['2023']:
pages_to_parse.append(page)
if self.not_before.year <= 2022:
for page in self.__RAA_PAGE['2022']:
pages_to_parse.append(page)
if self.not_before.year <= 2021:
for page in self.__RAA_PAGE['2021']:
pages_to_parse.append(page)
if self.not_before.year <= 2020:
for page in self.__RAA_PAGE['2020']:
pages_to_parse.append(page)
if self.not_before.year <= 2019:
for page in self.__RAA_PAGE['2019']:
pages_to_parse.append(page)
elements = self.get_raa_with_pager(
pages_to_parse,
".fr-pagination__link.fr-pagination__link--next",
self.__HOST
)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque élément fr-card__content, on cherche sa balise a, et si
# c'est un PDF on le parse
cards = soup.find_all('div', class_='fr-card__content')
for card in cards:
a = card.find('a')
if a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.get_text().strip()
date = datetime.datetime.strptime(card.find('p', class_='fr-card__detail').get_text().replace('Publié le ', '').strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref09(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref09(Attrap): hostname = 'https://www.ariege.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs/Recueils-des-Actes-Administratifs-de-l-Ariege-a-partir-du-28-avril-2015'
# Config
__HOST = 'https://www.ariege.gouv.fr'
__RAA_PAGE = f'{__HOST}/Publications/Recueil-des-actes-administratifs/Recueils-des-Actes-Administratifs-de-l-Ariege-a-partir-du-28-avril-2015'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture de l\'Ariège' full_name = 'Préfecture de l\'Ariège'
short_code = 'pref09' short_code = 'pref09'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.__USER_AGENT)
self.enable_tor(10)
def get_raa(self, keywords):
pages_to_parse = []
# Les RAA de l'Ariège sont éparpillés sur des sous-pages par mois.
# Donc on parse la page principale à la recherche des sous-pages.
sub_pages = self.get_sub_pages_with_pager(
self.__RAA_PAGE,
'div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link',
'ul.fr-pagination__list li a.fr-pagination__link.fr-pagination__link--next',
'div.fr-card__body div.fr-card__content div.fr-card__end p.fr-card__detail',
self.__HOST
)[::-1]
# On filtre par date les sous-pages pour limiter les requêtes
for sub_page in sub_pages:
guessed_date = datetime.datetime.strptime(sub_page['details'].replace('Publié le ', '').strip(), '%d/%m/%Y')
guessed_date.replace(day=1)
if guessed_date >= self.not_before:
pages_to_parse.append(sub_page['url'])
# On parse les pages contenant des RAA
elements = []
for page in pages_to_parse:
page_content = self.get_page(page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('div.fr-downloads-group.fr-downloads-group--bordered ul li a'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref10(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.aube.gouv.fr'
raa_page = [
f'{hostname}/Publications/RAA-Recueil-des-Actes-Administratifs',
f'{hostname}/Publications/RAA-Recueil-des-Actes-Administratifs/RAA-Archives'
]
full_name = 'Préfecture de l\'Aube'
short_code = 'pref10'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = 'RAA *([0-9]{4})'
# On ajoute un widget custom représentant les liens sur la page d'accueil
Attrap_prefdpt.widgets.append(
Attrap_prefdpt.DptWidget(
'homepage_links',
regex={'year': 'Année *([0-9]{4})'},
css_path={'title': 'div.fr-text--lead p a.fr-link'}
)
)
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref11(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.aude.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-Actes-Administratifs-RAA'
full_name = 'Préfecture de l\'Aude'
short_code = 'pref11'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = 'Année *([0-9]{4})'
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref13(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref13(Attrap): hostname = 'https://www.bouches-du-rhone.gouv.fr'
raa_page = [
# Config f'{hostname}/Publications/RAA-et-Archives',
__HOST = 'https://www.bouches-du-rhone.gouv.fr' f'{hostname}/Publications/RAA-et-Archives/Archives-RAA-des-Bouches-du-Rhone'
__RAA_PAGE = [
f'{__HOST}/Publications/RAA-et-Archives/RAA-2024',
f'{__HOST}/Publications/RAA-et-Archives/RAA-2023',
f'{__HOST}/Publications/RAA-et-Archives/Archives-RAA-des-Bouches-du-Rhone/RAA-2022',
f'{__HOST}/Publications/RAA-et-Archives/Archives-RAA-des-Bouches-du-Rhone/RAA-2021',
f'{__HOST}/Publications/RAA-et-Archives/Archives-RAA-des-Bouches-du-Rhone/RAA-2020',
f'{__HOST}/Publications/RAA-et-Archives/Archives-RAA-des-Bouches-du-Rhone/RAA-2019'
] ]
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
full_name = 'Préfecture des Bouches-du-Rhône' full_name = 'Préfecture des Bouches-du-Rhône'
short_code = 'pref13' short_code = 'pref13'
timezone = 'Europe/Paris'
def __init__(self, data_dir): # Configuration des widgets à analyser
super().__init__(data_dir, self.__USER_AGENT) Attrap_prefdpt.grey_card['regex']['year'] = 'RAA[- ]*([0-9]{4})'
self.enable_tor(10) Attrap_prefdpt.grey_card['follow_link_on_unrecognised_date'] = False
def get_raa(self, keywords):
elements = []
for raa_page in self.__RAA_PAGE:
page_content = self.get_page(raa_page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('a.fr-link.fr-link--download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref25(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.doubs.gouv.fr'
raa_page = f'{hostname}/Publications/Publications-Legales/Recueil-des-Actes-Administratifs-RAA'
full_name = 'Préfecture du Doubs'
short_code = 'pref25'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
Attrap_prefdpt.grey_card['follow_link_on_unrecognised_date'] = False
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref29(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.finistere.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs'
full_name = 'Préfecture du Finistère'
short_code = 'pref29'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '(?:Recueils publiés en ).*([0-9]{4})'
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref2a(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.corse-du-sud.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs/Recueil-des-actes-administratifs-de-la-prefecture-de-la-Corse-du-Sud'
full_name = 'Préfecture de la Corse-du-Sud'
short_code = 'pref2a'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.white_card['regex']['year'] = '([0-9]{4})'
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref2b(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.haute-corse.gouv.fr'
raa_page = f'{hostname}/Publications/Publications-administratives-et-legales/Recueils-des-actes-administratifs'
full_name = 'Préfecture de Haute-Corse'
short_code = 'pref2b'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = 'Recueils des actes administratifs ([0-9]{4})'
Attrap_prefdpt.white_card['regex']['month'] = '([A-Za-zéû]* [0-9]{4})'
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref30(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.gard.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-Actes-Administratifs'
full_name = 'Préfecture du Gard'
short_code = 'pref30'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
import os from Attrap_prefdpt import Attrap_prefdpt
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap class Attrap_pref31(Attrap_prefdpt):
# Configuration de la préfecture
class Attrap_pref31(Attrap): hostname = 'https://www.haute-garonne.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-Actes-Administratifs/Recueil-des-Actes-Administratifs-Haute-Garonne'
# Config
__HOST = 'https://www.haute-garonne.gouv.fr'
__RAA_PAGE = f'{__HOST}/Publications/Recueil-des-Actes-Administratifs/Recueil-des-Actes-Administratifs-Haute-Garonne'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
full_name = 'Préfecture de la Haute-Garonne' full_name = 'Préfecture de la Haute-Garonne'
short_code = 'pref31' short_code = 'pref31'
timezone = 'Europe/Paris'
def __init__(self, data_dir): # Configuration des widgets à analyser
super().__init__(data_dir, self.__USER_AGENT) Attrap_prefdpt.grey_card['regex']['month'] = '([A-Za-zéû]* [0-9]{4})'
def get_raa(self, keywords):
# On cherche les pages de chaque mois
page_content = self.get_page(self.__RAA_PAGE, 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.__HOST,
False
)[::-1]
pages_to_parse = []
# On filtre les pages de mois pour limiter le nombre de requêtes
for month_page in month_pages:
guessed_date = Attrap.guess_date(month_page['name'], '([a-zéû]* [0-9]{4})')
if guessed_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
elements = []
# On parse les pages des mois qu'on veut analyser
for element in self.get_raa_with_pager(
pages_to_parse,
".fr-pagination__link.fr-pagination__link--next",
self.__HOST
):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link.menu-item-link'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.__HOST}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.get_text().strip().capitalize()
date = datetime.datetime.strptime(a['title'].split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name)
elements.append(raa)
return elements