Skip to content
Extraits de code Groupes Projets

Comparer les révisions

Les modifications sont affichées comme si la révision source était fusionnée avec la révision cible. En savoir plus sur la comparaison des révisions.

Source

Sélectionner le projet cible
No results found

Cible

Sélectionner le projet cible
  • la-quadrature-du-net/Attrap
  • foggyfrog/Attrap
  • skhwiz/Attrap
  • precambrien/Attrap
  • ketsapiwiq/Attrap
  • Joseki/Attrap
  • kr1p/attrap-pref-12
  • kr1p/attrap-pref-46
  • kr1p/attrap-pi
  • Guinness/Attrap
  • astroidgritty/attrap-pref-84
  • davinov/Attrap
  • maettellite/attrap-pref-01
  • m242/Attrap
  • multi/Attrap
  • mverdeil/Attrap
  • olpo/Attrap
17 résultats
Afficher les modifications
Affichage de avec 1490 ajouts et 450 suppressions
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref73(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.savoie.gouv.fr'
raa_page = f'{hostname}/Publications/Recueils-hebdomadaires-et-speciaux-des-actes-administratifs'
full_name = 'Préfecture de Savoie'
short_code = 'pref73'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref75(Attrap):
# Les RAA de Paris sont sur le site de la préfecture de région
# Île-de-France. On ne prend en compte que les RAA départementaux.
# Config
hostname = 'https://www.prefectures-regions.gouv.fr'
raa_page = f'{hostname}/ile-de-france/tags/view/Ile-de-France/Documents+et+publications/Recueil+des+actes+administratifs'
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0'
full_name = 'Préfecture de Paris'
short_code = 'pref75'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.enable_tor(10)
self.set_sleep_time(10)
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_session(self.raa_page, 'main', 6)
year_pages = self.get_sub_pages(
page_content,
'article.news-list-item header h2.news-list-title a',
self.hostname,
False,
selenium=True
)
for year_page in year_pages:
year_date = Attrap.guess_date(year_page['name'].strip(), '(?:.*Paris.*)([0-9]{4})').replace(day=1, month=1)
if year_date.year >= self.not_before.year and year_date.year < 9999:
year_pages_to_parse.append(year_page['url'])
pages_to_parse = []
for year_page in year_pages_to_parse:
page_content = self.get_session(year_page, 'main', 6)
year = BeautifulSoup(page_content, 'html.parser').select('div.breadcrumb div.container p span.active')[0].get_text().split('-')[-1].strip()
month_pages = self.get_sub_pages(
page_content,
'div.sommaire-bloc div.sommaire-content ol li a',
self.hostname,
False,
selenium=True
)[::-1]
for month_page in month_pages:
month_date = Attrap.guess_date(f"{month_page['name']} {year}", "(.*)").replace(day=1)
if month_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
elements = []
for page in pages_to_parse[::-1]:
page_content = self.get_session(page, 'main', 6)
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements[::-1], keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('main div.container.main-container div.col-main article.article div.texte div a.link-download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').get_text().strip()
# On devine la date du RAA à partir du nom de fichier
guessed = Attrap.guess_date(name, '((?:[0-9]{2}(?:-|\\.)[0-9]{2}(?:-|\\.)20[0-9]{2})|(?:20[0-9]{2}(?:-|\\.)[0-9]{2}(?:-|\\.)[0-9]{2})\\D*^)')
if (guessed == datetime.datetime(9999, 1, 1, 0, 0)):
date = None
else:
date = guessed
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref76(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.seine-maritime.gouv.fr'
raa_page = f'{hostname}/Publications/Recueils-des-actes-administratifs-RAA'
full_name = 'Préfecture de la Seine-Maritime'
short_code = 'pref76'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
Attrap_prefdpt.grey_card['regex']['month'] = '([A-Za-zéû]* [0-9]{4})'
Attrap_prefdpt.grey_card['follow_link_on_unrecognised_date'] = False
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref77(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.seine-et-marne.gouv.fr'
raa_page = f'{hostname}/Publications/RECUEILS-DES-ACTES-ADMINISTRATIFS-RAA'
full_name = 'Préfecture de Seine-et-Marne'
short_code = 'pref77'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.white_card['regex']['year'] = 'RAA ([0-9]{4})'
# On ajoute un widget de menu déroulant
Attrap_prefdpt.select_widgets.append(
Attrap_prefdpt.DptSelectWidget(
'menu_deroulant',
regex='D77-([0-9]{2}-[0-9]{2}-[0-9]{4})',
css_path='select#Liste-liste-docs',
type='year-month-day'
)
)
import os
import datetime
import logging
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
logger = logging.getLogger(__name__)
class Attrap_pref80(Attrap):
# Config
hostname = 'https://www.somme.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs-du-departement-de-la-Somme'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture de la Somme'
short_code = 'pref80'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(30)
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_page(self.raa_page, 'get').content
year_pages = self.get_sub_pages(
page_content,
'div.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)
for year_page in year_pages:
year_date = Attrap.guess_date(year_page['name'].strip(), '.*([0-9]{4})').replace(day=1, month=1)
if year_date.year >= self.not_before.year:
year_pages_to_parse.append(year_page['url'])
# Pour chaque page Année, on récupère la liste des RAA
elements = []
for year_page in year_pages_to_parse:
page_content = self.get_page(year_page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le
# parse
for a in soup.select('div.fr-text--lead p a.fr-link'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
# On enlève les espaces insécables, les double-espaces, et le texte « Télécharger » de certains liens
name = a.get_text().replace('Télécharger ', '').strip().replace(u"\u00A0", ' ').replace(' ', ' ')
if name and not name == '':
# Certains RAA de la Somme ont une ligne avec les détails du fichier. Si cette ligne
# est disponible, on la parse, sinon on devine la date à partir du nom
date = None
if a.find('span'):
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
else:
regex = '.* n°.*(?:du)? ([0-9]*(?:er)? [a-zéû]* (?:[0-9]{4}|[0-9]{2}))'
date = Attrap.guess_date(name, regex)
# Parfois, il manque l'année dans le nom du RAA, alors on essaie avec l'année de la page
if date.year == 9999:
page_year = soup.select('nav.fr-breadcrumb div.fr-collapse ol.fr-breadcrumb__list li a.fr-breadcrumb__link.breadcrumb-item-link')[-1].get_text().replace('Année ', '').strip()
date = Attrap.guess_date(f'{name} {page_year}', regex)
# Parfois, c'est que le fichier n'est pas un RAA mais un arrêté seul
if date.year == 9999:
date = Attrap.guess_date(name, '([0-9]*(?:er)? [a-zéû]* [0-9]{4})')
if date.year == 9999:
logger.warning(f'On ignore {name} (URL : {url})')
else:
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements[::-1]
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref81(Attrap):
# Config
hostname = 'https://www.tarn.gouv.fr'
raa_page = f'{hostname}/Publications/RAA-Recueil-des-Actes-Administratifs/RAA'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture du Tarn'
short_code = 'pref81'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(5)
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_page(self.raa_page, 'get').content
year_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)
for year_page in year_pages:
if int(year_page['name'].replace('Année ', '').strip()) >= self.not_before.year:
year_pages_to_parse.append(year_page['url'])
month_pages_to_parse = []
# Pour chaque année, on cherche les sous-pages de mois
for year_page in year_pages_to_parse:
page_content = self.get_page(year_page, 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)[::-1]
for month_page in month_pages:
# On filtre les mois ne correspondant pas à la période analysée
guessed_date = Attrap.guess_date(month_page['name'], '(.*)')
if guessed_date.replace(day=1) >= self.not_before.replace(day=1):
month_pages_to_parse.append(month_page['url'])
pages_to_parse = []
# Pour chaque page de mois, on cherche les pages de RAA
for month_page in month_pages_to_parse:
# TODO : il reste à gérer le cas où une page de mois redirige vers un RAA (cela
# arrive quand la préfecture n'a publié qu'un seul RAA pendant le mois)
pages = self.get_sub_pages_with_pager(
month_page,
'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link.fr-mb-3w div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link',
'nav.fr-pagination ul.fr-pagination__list li a.fr-pagination__link.fr-pagination__link--next.fr-pagination__link--lg-label',
'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link.fr-mb-3w div.fr-card__body div.fr-card__content div.fr-card__end p.fr-card__detail',
self.hostname
)[::-1]
for page in pages:
guessed_date = datetime.datetime.strptime(page['details'].replace('Publié le ', '').strip(), '%d/%m/%Y')
if guessed_date.replace(day=1) >= self.not_before.replace(day=1):
pages_to_parse.append(page['url'])
# On ajoute également la page racine, qui peut contenir des RAA mal catégorisés
pages_to_parse.append(self.raa_page)
elements = []
# On parse les pages contenant des RAA
for page in pages_to_parse:
page_content = self.get_page(page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
# On parse les RAA
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('div.fr-grid-row div.fr-downloads-group.fr-downloads-group--bordered ul li a'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref83(Attrap):
# Config
hostname = 'https://www.var.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture du Var'
short_code = 'pref83'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(30)
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_page(self.raa_page, 'get').content
for year_page in self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
):
year = Attrap.guess_date(year_page['name'].strip(), 'Recueil des actes administratifs ([0-9]{4})').year
if year < 9999 and year >= self.not_before.year:
year_pages_to_parse.append(year_page['url'])
pages_to_parse = []
# Pour chaque année, on cherche les sous-pages de mois
for raa_page in year_pages_to_parse:
pages_to_parse.append(raa_page)
page_content = self.get_page(raa_page, 'get').content
for month_page in self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)[::-1]:
card_date = Attrap.guess_date(month_page['name'].strip(), '(.*)').replace(day=1)
if card_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
# On parse les pages contenant des RAA
elements = self.get_raa_with_pager(
pages_to_parse[::-1],
'.fr-pagination__link.fr-pagination__link--next',
self.hostname
)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque section contenant un RAA
cards = soup.select('div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link.menu-item-link')
for a in cards:
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.get_text().strip()
date = datetime.datetime.strptime(a['title'].split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
from Attrap_prefdpt import Attrap_prefdpt
class Attrap_pref87(Attrap_prefdpt):
# Configuration de la préfecture
hostname = 'https://www.haute-vienne.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs'
full_name = 'Préfecture de la Haute-Vienne'
short_code = 'pref87'
timezone = 'Europe/Paris'
# Configuration des widgets à analyser
Attrap_prefdpt.grey_card['regex']['year'] = '([0-9]{4})'
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref91(Attrap):
# Config
hostname = 'https://www.essonne.gouv.fr'
raa_page = f'{hostname}/Publications/Recueils-des-actes-administratifs-RAA'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture de l\'Essonne'
short_code = 'pref91'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(30)
def get_raa(self, keywords):
pages_to_parse = []
# On détermine quelles pages d'année parser
year_pages_to_parse = []
page_content = self.get_page(self.raa_page, 'get').content
year_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)
for year_page in year_pages:
year = int(year_page['name'].strip())
if year >= self.not_before.year:
year_pages_to_parse.append(year_page)
# Pour chaque année, on cherche les sous-pages de mois
month_pages_to_parse = []
for year_page in year_pages_to_parse:
year = year_page['name'].strip()
page_content = self.get_page(year_page['url'], 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)[::-1]
for month_page in month_pages[::-1]:
month = month_page['name'].strip()
guessed_date = Attrap.guess_date(f'{month} {year}', '(.*)')
if guessed_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
# On parse les pages sélectionnées
elements = []
for page_to_parse in pages_to_parse:
page_content = self.get_page(page_to_parse, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements[::-1], keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('a.fr-link.fr-link--download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
if a.get('title'):
name = a.get_text().strip()
date = datetime.datetime.strptime(a['title'].split(' - ')[-1].strip(), '%d/%m/%Y')
else:
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import datetime
import re
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref92(Attrap):
# Config
hostname = 'https://www.hauts-de-seine.gouv.fr'
raa_page = f'{hostname}/Publications/Annonces-avis/Le-Recueil-des-actes-administratifs'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture des Hauts-de-Seine'
short_code = 'pref92'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(5)
def get_raa(self, keywords):
# On récupère les pages d'années
year_pages = []
page_content = self.get_page(self.raa_page, 'get').content
for card in self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
):
if Attrap.guess_date(card['name'], '.* ([0-9]{4})').year >= self.not_before.year:
year_pages.append(card['url'])
# On récupère tous les RAA en suivant la navigation
elements = self.get_raa_with_pager(
year_pages,
'a.fr-pagination__link.fr-pagination__link--next',
self.hostname
)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('.fr-card__title a.fr-card__link.menu-item-link'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = re.sub(r'([0-9]{4}-[0-9]{2}-[0-9]{2}) ', ' ', a.get_text()).strip()
date = datetime.datetime.strptime(a['title'].split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import re
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref93(Attrap):
# Config
hostname = 'https://www.seine-saint-denis.gouv.fr'
raa_page = f'{hostname}/Publications/Bulletin-d-informations-administratives-Recueil-des-actes-administratifs/'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture de Seine-Saint-Denis'
short_code = 'pref93'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(30)
def get_raa(self, keywords):
pages_to_parse = []
# On récupère les pages d'années
page_content = self.get_page(self.raa_page, 'get').content
year_pages = self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False,
)[::-1]
# On filtre par date pour limiter les requêtes
year_pages_to_parse = []
for year_page in year_pages:
year = 9999
try:
year = int(re.search('.*([0-9]{4})', year_page['name'].strip(), re.IGNORECASE).group(1))
if year is None:
year = 9999
except Exception as exc:
logger.warning(f"Impossible de deviner l\'année de la page {year_page['name']}")
year = 9999
if year >= self.not_before.year:
year_pages_to_parse.append(year_page['url'])
# Pour chaque année, on cherche les sous-pages de mois
for year_page in year_pages_to_parse:
page_content = self.get_page(year_page, 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)[::-1]
# On filtre en fonction de la date demandée
for month_page in month_pages:
guessed_date = Attrap.guess_date(month_page['name'].strip(), '([a-zéû]*).*')
if guessed_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
# On parse les pages contenant des RAA
elements = []
for page in pages_to_parse:
page_content = self.get_page(page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements[::-1], keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
soup = BeautifulSoup(page_content, 'html.parser')
for card in soup.select('div.fr-card__body div.fr-card__content'):
a = card.select_one('h2.fr-card__title a.fr-card__link')
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.text.strip()
date = datetime.datetime.strptime(card.select_one('div.fr-card__end p.fr-card__detail').get_text().removeprefix('Publié le ').strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
import logging
from Attrap import Attrap
class Attrap_pref94(Attrap):
# Config
hostname = 'https://www.val-de-marne.gouv.fr'
raa_page = f'{hostname}/Publications/Publications-legales/RAA-Recueil-des-actes-administratifs'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture du Val-de-Marne'
short_code = 'pref94'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(30)
def get_raa(self, keywords):
elements = []
page_content = self.get_page(self.raa_page, 'get').content
for sub_page in self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
):
if Attrap.guess_date(sub_page['name'], '([0-9]{4})$').year >= self.not_before.year:
sub_page_content = self.get_page(sub_page['url'], 'get').content
for element in self.get_raa_elements(sub_page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le
# parse
for a in soup.select('a.fr-link.fr-link--download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_pref976(Attrap):
# Config
hostname = 'https://www.mayotte.gouv.fr'
raa_page = f'{hostname}/Publications/Recueil-des-actes-administratifs-R.A.A'
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:109.0) Gecko/20100101 Firefox/115.0'
full_name = 'Préfecture de Mayotte'
short_code = 'pref976'
timezone = 'Indian/Mayotte'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(5)
def get_raa(self, keywords):
year_pages_to_parse = []
# On récupère les pages d'années
page_content = self.get_page(self.raa_page, 'get').content
for card in self.get_sub_pages(
page_content,
'div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
):
if Attrap.guess_date(card['name'], '([0-9]{4})').year >= self.not_before.year:
year_pages_to_parse.append(card['url'])
pages_to_parse = [self.raa_page]
# Pour chaque année, on cherche les sous-pages de mois
for raa_page in year_pages_to_parse:
page_content = self.get_page(raa_page, 'get').content
month_pages = self.get_sub_pages(
page_content,
'.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a',
self.hostname,
False
)[::-1]
# On regarde aussi si sur la page de l'année il n'y aurait pas un
# RAA mal catégorisé
for page_to_parse in self.find_raa_card(page_content):
pages_to_parse.append(page_to_parse)
# Pour chaque mois, on cherche les pages des RAA
for month_page in month_pages:
year = Attrap.guess_date(month_page['name'], '(.*)').year
for page_to_parse in self.find_raa_card(
month_page['url'],
year
):
pages_to_parse.append(page_to_parse)
# On parse les pages contenant des RAA
elements = []
for page in pages_to_parse:
page_content = self.get_page(page, 'get').content
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def find_raa_card(self, page, year=None):
pages = []
card_pages = self.get_sub_pages_with_pager(
page,
'div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link',
'ul.fr-pagination__list li a.fr-pagination__link.fr-pagination__link--next',
None,
self.hostname
)[::-1]
for card_page in card_pages:
# On filtre les pages de RAA ne correspondant pas à la période
# analysée
guessed_date = Attrap.guess_date(card_page['name'], 'n°[ 0-9]* du ([0-9]*(?:er)? [a-zéû]* [0-9]*)')
if year:
guessed_date = guessed_date.replace(year=year)
if guessed_date >= self.not_before:
pages.append(card_page['url'])
return pages
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select('a.fr-link.fr-link--download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import datetime
import time
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_prefbretagne(Attrap):
# Config
hostname = 'https://www.prefectures-regions.gouv.fr'
raa_page = f'{hostname}/bretagne/Documents-publications/Recueils-des-actes-administratifs/Recueil-des-actes-administratifs'
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0'
full_name = 'Préfecture de la région Bretagne'
short_code = 'prefbretagne'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.enable_tor(10)
self.set_sleep_time(10)
def get_raa(self, keywords):
# page_content = self.get_page(self.raa_page, 'get').content
page_content = self.get_session(self.raa_page, 'main', 6)
elements = self.get_raa_elements(page_content)
time.sleep(10)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('main div.container.main-container div.col-main article.article div.texte div a.link-download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').get_text().strip()
# On devine la date du RAA à partir du nom de fichier
guessed = Attrap.guess_date(name, '((?:[0-9]{2}|[0-9]{1})(?:er){0,1}[ _](?:[a-zéû]{3,9})[ _](?:[0-9]{4}|[0-9]{2}))')
if (guessed == datetime.datetime(9999, 1, 1, 0, 0)):
date = None
else:
date = guessed
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_prefdpt(Attrap):
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0'
# Paramètres par défaut des cartes grises et blanches. Devrait la plupart du temps être surchargés par la classe de préfecture de département
grey_card = {
'regex': {
'year': None,
'month': None
},
'css_path': {
'title': 'div.fr-card.fr-card--sm.fr-card--grey.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a'
},
'link_to_raa': False,
'autodetect_links_to_raa': True,
'follow_link_on_unrecognised_date': True,
'exclude': [],
'add_year_to_months': False
}
white_card = {
'regex': {
'year': None,
'month': None,
},
'css_path': {
'title': 'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link div.fr-card__body div.fr-card__content h2.fr-card__title a.fr-card__link', # Chemin CSS du titre des cartes blanches
'details': 'div.fr-card.fr-card--horizontal.fr-card--sm.fr-enlarge-link div.fr-card__body div.fr-card__content div.fr-card__end p.fr-card__detail', # Chemin CSS du détail des cartes blanches
'pager': 'ul.fr-pagination__list li a.fr-pagination__link.fr-pagination__link--next.fr-pagination__link--lg-label' # Chemin CSS du pager des cartes blanches
},
'link_to_raa': False,
'autodetect_links_to_raa': True,
'follow_link_on_unrecognised_date': True,
'exclude': [],
'add_year_to_months': False
}
# Liste des widgets à analyser (sera remplie au moment de l'initialisation, mais peut être surchargée par la classe de préfecture de département)
widgets = []
select_widgets = []
# Est-ce qu'on inclue les widgets des cartes blanches et grises ? Par défaut oui, mais il peut être nécessaire de les désactiver sur certaines préfectures
include_grey_card_widget = True
include_white_card_widget = True
# Chemin CSS vers un RAA
element_css_path = 'div.fr-downloads-group.fr-downloads-group--bordered ul li a,div a.fr-link.fr-link--download'
# Temporisation (en secondes) entre chaque requête (ne devrait pas avoir à être changée)
pref_sleep_time = 5
class DptWidget:
"""Une classe représentant un widget sur le site d'une préfecture de département."""
def __init__(self, name, regex=None, css_path=None, link_to_raa=False, autodetect_links_to_raa=True, follow_link_on_unrecognised_date=True, exclude=[], add_year_to_months=False):
self.name = name
self.regex = regex
self.css_path = css_path
self.link_to_raa = link_to_raa
self.autodetect_links_to_raa = autodetect_links_to_raa
self.follow_link_on_unrecognised_date = follow_link_on_unrecognised_date
self.exclude = exclude
self.add_year_to_months = add_year_to_months
def has_css_path(self, key):
return self.css_path and self.css_path.get(key, None) is not None
def get_css_path(self, key):
if not self.has_css_path(key):
return None
else:
return self.css_path.get(key, None)
def has_regex(self, key):
return self.regex and self.regex.get(key, None) is not None
def get_regex(self, key):
if not self.has_regex(key):
return None
else:
return self.regex.get(key, None)
class DptSelectWidget:
"""Une classe représentant un menu déroulant sur le site d'une préfecture de département."""
def __init__(self, name, regex=None, css_path=None, follow_link_on_unrecognised_date=True, exclude=[], type='year-month-day'):
self.name = name
self.regex = regex
self.css_path = css_path
self.follow_link_on_unrecognised_date = follow_link_on_unrecognised_date
self.exclude = exclude
self.type = type
def add_url(self, url, date=None):
if date and date.year == 9999:
date = None
self.page_urls_to_parse.append([url, date])
def get_urls_to_parse(self):
urls = []
for url in self.page_urls_to_parse:
urls.append(url[0])
return urls
def __init__(self, data_dir):
"""Une classe générique permettant d'analyser les préfectures de département en fonction de certains paramètres."""
super().__init__(data_dir, self.user_agent)
self.set_sleep_time(self.pref_sleep_time)
self.page_urls_to_parse = []
if isinstance(self.raa_page, str):
self.add_url(self.raa_page)
else:
for url in self.raa_page:
self.add_url(url)
self.elements = []
# On ajoute les cartes grises et blanches à la liste des widgets à parser
if self.include_grey_card_widget:
self.widgets.append(
Attrap_prefdpt.DptWidget(
'grey_card',
regex=self.grey_card['regex'],
css_path=self.grey_card['css_path'],
link_to_raa=self.grey_card['link_to_raa'],
autodetect_links_to_raa=self.grey_card['autodetect_links_to_raa'],
follow_link_on_unrecognised_date=self.grey_card['follow_link_on_unrecognised_date'],
exclude=self.grey_card['exclude'],
add_year_to_months=self.grey_card['add_year_to_months']
)
)
if self.include_white_card_widget:
self.widgets.append(
Attrap_prefdpt.DptWidget(
'white_card',
regex=self.white_card['regex'],
css_path=self.white_card['css_path'],
link_to_raa=self.white_card['link_to_raa'],
autodetect_links_to_raa=self.white_card['autodetect_links_to_raa'],
follow_link_on_unrecognised_date=self.white_card['follow_link_on_unrecognised_date'],
exclude=self.white_card['exclude'],
add_year_to_months=self.white_card['add_year_to_months']
)
)
def get_raa(self, keywords):
while not self.page_urls_to_parse == []:
page_url = self.page_urls_to_parse[-1]
page_content = self.get_page(page_url[0], 'get').content # On récupère le HTML de la page
self.parse_widgets(page_url, page_content) # On parse les cartes
self.parse_select_widgets(page_url, page_content) # On parse les menus déroulants
for element in self.get_raa_elements(page_content): # On cherche les RAA
self.elements.append(element)
self.page_urls_to_parse.remove(page_url) # On supprime la page de la liste de celles à parser
self.parse_raa(self.elements[::-1], keywords)
self.mailer()
def parse_widgets(self, page_url, page_content):
# Pour chaque widget paramétré qui n'est pas de type select, on le cherche sur la page
for widget in self.widgets:
cards = []
# Si n'appelle pas la même fonction le widget a prévu un pager ou non
if widget.has_css_path('pager'):
cards = self.get_sub_pages_with_pager(
page_content,
widget.get_css_path('title'), # Titre du lien
widget.get_css_path('pager'), # Pager
widget.get_css_path('details'), # Détails
self.hostname
)
else:
cards = self.get_sub_pages(
page_content,
widget.get_css_path('title'),
self.hostname,
False
)
for card in cards:
if card['url'] not in self.get_urls_to_parse() and card['name'].strip() not in widget.exclude:
date = None
date_is_correct = False
# Si un regex d'année est spécifié, on parse le titre avec
if widget.has_regex('year'):
date = Attrap.guess_date(card['name'].strip(), widget.get_regex('year')).replace(day=1, month=1)
# Si une date a été trouvée (l'année n'est pas 9999) et qu'elle est avant la valeur not_before, on la marque comme correcte
if date >= self.not_before.replace(day=1, month=1) and date.year < 9999:
date_is_correct = True
# Si un regex de mois est spécifié et qu'aucune date correcte n'a été trouvée, on teste avec le regex de mois sur le titre
if widget.has_regex('month') and (not date or date.year == 9999):
# On ajoute l'année au nom du mois à tester si configuré dans le widget
if widget.add_year_to_months and page_url[1]:
month = card['name'].strip() + ' ' + str(page_url[1].year)
else:
month = card['name'].strip()
date = Attrap.guess_date(month, widget.get_regex('month')).replace(day=1)
if date >= self.not_before.replace(day=1) and date.year < 9999:
date_is_correct = True
# Si un chemin CSS vers les détails du widget est spécifié et qu'aucune date correcte n'a été trouvée, on tente de parser la date présente dans les détails
if widget.has_css_path('details') and (not date or date.year == 9999):
try:
date = datetime.datetime.strptime(card['details'].replace('Publié le ', '').strip(), '%d/%m/%Y')
if date >= self.not_before:
date_is_correct = True
except Exception as e:
date = datetime.datetime(9999, 1, 1)
# Si la configuration indique que les liens renvoient vers un RAA, on ajoute le lien à la liste des éléments
if widget.link_to_raa or (widget.autodetect_links_to_raa and card['url'].endswith('.pdf')):
if date and date.year == 9999:
date = None
raa = Attrap.RAA(card['url'], date, card['name'].strip(), timezone=self.timezone)
self.elements.append(raa)
else:
# Si une date a été trouvée, on regarde s'il faut ajouter l'URL à la liste des pages à parser
if date_is_correct or ((date is None or date.year == 9999) and widget.follow_link_on_unrecognised_date):
self.add_url(card['url'], date)
def parse_select_widgets(self, page_url, page_content):
for select_widget in self.select_widgets:
# Les widgets select fonctionnent différemment : chaque valeur option doit être testée pour trouver une date, et si la date correspond
# à la date recherchée la requête POST est envoyée, puis le résultat est analysé par get_raa_elements()
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère les select
for select in soup.select(select_widget.css_path):
# On récupère les option de chaque select
for option in select.find_all('option'):
if not option['value'] == "" and option['title'].strip() not in select_widget.exclude:
# On estime la date à partir du nom de fichier
date = Attrap.guess_date(option['title'].strip(), select_widget.regex)
match select_widget.type:
case 'year':
date = date.replace(day=1, month=1)
not_before = self.not_before.replace(day=1, month=1)
case 'year-month':
date = date.replace(day=1)
not_before = self.not_before.replace(day=1)
case _:
not_before = self.not_before
# Si la date estimée correspond à la plage d'analyse ou si follow_link_on_unrecognised_date est à True,
# on demande au serveur les détails du RAA
if (date.year < 9999 and date >= not_before) or (date.year == 9999 and select_widget.follow_link_on_unrecognised_date):
page_content = self.get_page(
page_url[0],
'post',
{
select['id']: option['value']
}
).content
for element in self.get_raa_elements(page_content):
self.elements.append(element)
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On récupère chaque balise a
for a in soup.select(self.element_css_path):
if a.get('href'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').previous_sibling.replace('Télécharger ', '').strip()
if not name:
name = url.split('/')[-1].strip()
try:
date = datetime.datetime.strptime(a.find('span').get_text().split(' - ')[-1].strip(), '%d/%m/%Y')
except Exception:
date = None
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_prefidf(Attrap):
# Config
hostname = 'https://www.prefectures-regions.gouv.fr'
raa_page = f'{hostname}/ile-de-france/tags/view/Ile-de-France/Documents+et+publications/Recueil+des+actes+administratifs'
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0'
full_name = 'Préfecture de la région Île-de-France'
short_code = 'prefidf'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.enable_tor(10)
self.set_sleep_time(10)
def get_raa(self, keywords):
year_pages_to_parse = []
# On détermine quelles pages d'année parser
page_content = self.get_session(self.raa_page, 'main', 6)
year_pages = self.get_sub_pages(
page_content,
'article.news-list-item header h2.news-list-title a',
self.hostname,
False,
selenium=True
)
for year_page in year_pages:
year_date = Attrap.guess_date(year_page['name'].strip(), '(?:.*[ÎIiî]le-de-[Ff]rance.*)([0-9]{4})').replace(day=1, month=1)
if year_date.year >= self.not_before.year and year_date.year < 9999:
year_pages_to_parse.append(year_page['url'])
pages_to_parse = []
for year_page in year_pages_to_parse:
page_content = self.get_page(year_page, 'get', selenium=True).content
year = BeautifulSoup(page_content, 'html.parser').select('div.breadcrumb div.container p span.active')[0].get_text().split('-')[-1].strip()
month_pages = self.get_sub_pages(
page_content,
'div.sommaire-bloc div.sommaire-content ol li a',
self.hostname,
False,
selenium=True
)[::-1]
for month_page in month_pages:
month_date = Attrap.guess_date(f"{month_page['name']} {year}", "(.*)").replace(day=1)
if month_date >= self.not_before.replace(day=1):
pages_to_parse.append(month_page['url'])
elements = []
for page in pages_to_parse:
page_content = self.get_session(page, 'main', 6)
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('main div.container.main-container div.col-main article.article div.texte div a.link-download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').get_text().strip()
# On devine la date du RAA à partir du nom de fichier
guessed = Attrap.guess_date(name, '((?:[0-9]{2}(?:-|\\.)[0-9]{2}(?:-|\\.)20[0-9]{2})|(?:20[0-9]{2}(?:-|\\.)[0-9]{2}(?:-|\\.)[0-9]{2})\\D*^)')
if (guessed == datetime.datetime(9999, 1, 1, 0, 0)):
date = None
else:
date = guessed
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
import os
import datetime
from bs4 import BeautifulSoup
from urllib.parse import unquote
from Attrap import Attrap
class Attrap_prefpaca(Attrap):
# Config
hostname = 'https://www.prefectures-regions.gouv.fr'
raa_page = f'{hostname}/provence-alpes-cote-dazur/Documents-publications'
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0'
full_name = 'Préfecture de la région Provence-Alpes-Côte-d\'Azur'
short_code = 'prefpaca'
timezone = 'Europe/Paris'
def __init__(self, data_dir):
super().__init__(data_dir, self.user_agent)
self.enable_tor(10)
self.set_sleep_time(10)
def get_raa(self, keywords):
# On récupère une session avec Selenium
page_content = self.get_session(self.raa_page, 'main', 6)
# On récupère les pages d'années
year_pages = []
for year_page in self.get_sub_pages_with_pager(
page_content,
'article.news-list-item header h2.news-list-title a',
'article.article div.content-pagination ul.pagination li.next a',
None,
self.hostname,
selenium=True
):
year = Attrap.guess_date(year_page['name'].strip(), 'RAA ([0-9]{4})').year
if year < 9999 and year >= self.not_before.year:
year_pages.append(year_page['url'])
elements = []
for year_page in year_pages:
page_content = self.get_session(year_page, 'main', 6)
for element in self.get_raa_elements(page_content):
elements.append(element)
self.parse_raa(elements, keywords)
self.mailer()
def get_raa_elements(self, page_content):
elements = []
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.select('main div.container.main-container div.col-main article.article div.texte div a.link-download'):
if a.get('href') and a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = f"{self.hostname}{a['href']}"
else:
url = a['href']
url = unquote(url)
name = a.find('span').get_text().strip()
# On devine la date du RAA à partir du nom de fichier
guessed = Attrap.guess_date(name, '((?:[0-9]{2}|[0-9]{1})(?:er){0,1}[ _](?:[a-zéû]{3,9})[ _](?:[0-9]{4}|[0-9]{2}))')
if (guessed == datetime.datetime(9999, 1, 1, 0, 0)):
date = None
else:
date = guessed
raa = Attrap.RAA(url, date, name, timezone=self.timezone)
elements.append(raa)
return elements
FROM debian:trixie
ENV DEBIAN_FRONTEND="noninteractive"
ENV TZ="Europe/Paris"
RUN apt-get update && \
apt-get dist-upgrade -y && \
apt-get install --no-install-recommends -y python3 python3-virtualenv chromium-driver make xauth xvfb tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra ocrmypdf curl unzip tor && \
apt-get install --no-install-recommends -y python3 python3-virtualenv python-is-python3 chromium-driver make xauth xvfb tesseract-ocr tesseract-ocr-eng tesseract-ocr-fra curl unzip zip tor file ghostscript && \
apt-get clean && \
rm -rf /var/lib/apt/lists && \
ln -s /usr/bin/python3 /usr/bin/python && \
sed -i '/^#ControlPort 9051/s/^#//' /etc/tor/torrc
rm -rf /var/lib/apt/lists
make: ppparis pref06 pref13 pref34 pref35 pref38 pref59 pref62 pref69 pref83 pref976
make: ppparis pref01 pref02 pref03 pref04 pref05 pref06 pref09 pref10 pref11 pref13 pref2a pref2b pref25 pref29 pref30 pref31 pref33 pref34 pref35 pref38 pref39 pref42 pref44 pref49 pref50 pref52 pref54 pref55 pref59 pref61 pref62 pref63 pref64 pref65 pref66 pref69 pref73 pref75 pref76 pref77 pref80 pref81 pref83 pref87 pref91 pref92 pref93 pref94 pref976 prefbretagne prefidf prefpaca
ppparis:
python cli.py --pref ppparis
bin/python3 cli.py ppparis
pref01:
bin/python3 cli.py pref01
pref02:
bin/python3 cli.py pref02
pref03:
bin/python3 cli.py pref03
pref04:
bin/python3 cli.py pref04
pref05:
bin/python3 cli.py pref05
pref06:
python cli.py --pref pref06
bin/python3 cli.py pref06
pref09:
bin/python3 cli.py pref09
pref10:
bin/python3 cli.py pref10
pref11:
bin/python3 cli.py pref11
pref13:
python cli.py --pref pref13
bin/python3 cli.py pref13
pref2a:
bin/python3 cli.py pref2a
pref2b:
bin/python3 cli.py pref2b
pref25:
bin/python3 cli.py pref25
pref29:
bin/python3 cli.py pref29
pref30:
bin/python3 cli.py pref30
pref31:
bin/python3 cli.py pref31
pref33:
bin/python3 cli.py pref33
pref34:
python cli.py --pref pref34
bin/python3 cli.py pref34
pref35:
python cli.py --pref pref35
bin/python3 cli.py pref35
pref38:
python cli.py --pref pref38
bin/python3 cli.py pref38
pref39:
bin/python3 cli.py pref39
pref42:
bin/python3 cli.py pref42
pref44:
bin/python3 cli.py pref44
pref49:
bin/python3 cli.py pref49
pref50:
bin/python3 cli.py pref50
pref52:
bin/python3 cli.py pref52
pref54:
bin/python3 cli.py pref54
pref55:
bin/python3 cli.py pref55
pref59:
python cli.py --pref pref59
bin/python3 cli.py pref59
pref61:
bin/python3 cli.py pref61
pref62:
python cli.py --pref pref62
bin/python3 cli.py pref62
pref63:
bin/python3 cli.py pref63
pref64:
bin/python3 cli.py pref64
pref65:
bin/python3 cli.py pref65
pref66:
bin/python3 cli.py pref66
pref69:
python cli.py --pref pref69
bin/python3 cli.py pref69
pref73:
bin/python3 cli.py pref73
pref75:
bin/python3 cli.py pref75
pref76:
bin/python3 cli.py pref76
pref77:
bin/python3 cli.py pref77
pref80:
bin/python3 cli.py pref80
pref81:
bin/python3 cli.py pref81
pref83:
python cli.py --pref pref83
bin/python3 cli.py pref83
pref87:
bin/python3 cli.py pref87
pref91:
bin/python3 cli.py pref91
pref92:
bin/python3 cli.py pref92
pref93:
bin/python3 cli.py pref93
pref94:
bin/python3 cli.py pref94
pref976:
python cli.py --pref pref976
bin/python3 cli.py pref976
prefbretagne:
bin/python3 cli.py prefbretagne
prefidf:
bin/python3 cli.py prefidf
prefpaca:
bin/python3 cli.py prefpaca
lint:
bin/pycodestyle --first --show-source --ignore=E402,E501 *.py misc/*.py
import os, re, ssl, sys
import subprocess
import logging
import requests
import time
import datetime
from urllib.parse import quote
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
import dateparser
from bs4 import BeautifulSoup
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
from stem import Signal
from stem.control import Controller
import hashlib
import smtplib
from email.message import EmailMessage
from mastodon import Mastodon
logger = logging.getLogger(__name__)
class RAAspotter:
class RAA:
url = ""
date = datetime.datetime(1970, 1, 1)
date_str = ""
name = ""
filename = ""
sha256 = ""
def __init__(self, url, date, name, filename):
if not url == "":
self.url = url
if not date == "":
self.date = date
self.date_str = date.strftime("%d/%m/%Y")
if not name == "":
self.name = name
if not filename == "":
self.filename = filename
def get_sha256(self):
if (self.sha256 == ""):
self.sha256 = hashlib.sha256(self.filename.encode('utf-8')).hexdigest()
return self.sha256
def __init__(self, data_dir, user_agent=''):
logger.debug('Initialisation de RAAspotter')
self.session = requests.Session()
self.data_dir = data_dir
self.found = False
self.output_file_path = os.path.dirname(os.path.abspath(__file__))+f'/output_{self.short_code}.log'
self.sleep_time = 0
self.tor_enabled = False
self.tor_max_requests = 0
self.tor_requests = 0
self.not_before = datetime.datetime(2024, 1, 1)
self.smtp_configured = False
self.mastodon = None
self.mastodon_prefix = ''
self.mastodon_suffix = ''
self.update_user_agent(user_agent)
f = open(self.output_file_path,'w')
f.write('')
f.close()
def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix):
if access_token and access_token != "" and instance and instance != "":
self.mastodon = Mastodon(
access_token=access_token,
api_base_url=instance
)
self.mastodon_prefix = mastodon_prefix
self.mastodon_suffix = mastodon_suffix
def mastodon_toot(self, content):
if self.mastodon:
toot = content
if not self.mastodon_prefix == '':
toot = f"{self.mastodon_prefix}\n\n{toot}"
if not self.mastodon_suffix == '':
toot = f"{toot}\n\n{self.mastodon_suffix}"
self.mastodon.toot(toot)
def enable_tor(self, max_requests=0):
proxies = {
"http": f"socks5h://127.0.0.1:9050",
"https": f"socks5h://127.0.0.1:9050",
}
self.tor_enabled = True
self.tor_max_requests = max_requests
self.tor_requests = 0
self.session.proxies.update(proxies)
self.tor_get_new_id()
def disable_tor(self):
proxies = {}
self.tor_enabled = False
self.tor_max_requests = 0
self.tor_requests = 0
self.session.proxies.update(proxies)
def tor_get_new_id(self):
logger.info('Changement d\'identité Tor')
try:
controller = Controller.from_port(port = 9051)
controller.authenticate()
controller.signal(Signal.NEWNYM)
time.sleep(5)
self.tor_requests = 0
except:
logger.debug('Impossible de changer d\'identité Tor')
def get_sub_pages(self, page_content, element, host, recursive_until_pdf):
soup = BeautifulSoup(page_content, 'html.parser')
sub_pages = []
for a in soup.select(element):
if a.get('href'):
url = f"{host}{a['href']}"
sub_page_content = self.get_page(url, 'get').content
if recursive_until_pdf and not self.has_pdf(sub_page_content):
logger.info(f'{url} ne contient pas de PDF, on récupère ses sous-pages')
for sub_sub_page in self.get_sub_pages(sub_page_content, element, host, recursive_until_pdf):
sub_pages.append(sub_sub_page)
else:
sub_page = {
'url': url,
'name': a.get_text().strip()
}
sub_pages.append(sub_page)
return sub_pages
def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, host):
pages = []
page_content = self.get_page(page, 'get').content
# On initialise le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On recherche les sous-pages
sub_pages = soup.select(sub_page_element)
for sub_page in sub_pages:
if sub_page.get('href'):
page = {
'url': f"{host}{sub_page['href']}",
'name': sub_page.get_text().strip()
}
pages.append(page)
# On recherche un pager, et si on le trouve on le suit
pager = soup.select(pager_element)
if pager and pager[0] and pager[0].get('href'):
for sub_page in self.get_sub_pages_with_pager(f"{host}{pager[0]['href']}", sub_page_element, pager_element, host):
pages.append(sub_page)
return pages
def get_raa_with_pager(self, pages_list, pager_element, host):
elements = []
# On parse chaque page passée en paramètre
for page in pages_list:
page_content = self.get_page(page, 'get').content
# Pour chaque page, on récupère les PDF
for raa in self.get_raa_elements(page_content):
elements.append(raa)
# On regarde également s'il n'y aurait pas un pager
sub_pages = []
for sub_page in self.get_sub_pages(page_content, pager_element, host, True):
sub_pages.append(sub_page['url'])
for sub_raa in self.get_raa_with_pager(sub_pages, pager_element, host):
elements.append(sub_raa)
return elements
def set_sleep_time(self, sleep_time):
self.sleep_time = sleep_time
def has_pdf(self, page_content):
elements = []
soup = BeautifulSoup(page_content, 'html.parser')
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
return True
return False
# On démarre le navigateur
def get_session(self, url, wait_element=""):
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
if not self.user_agent == "":
webdriver_options.add_argument(f"--user-agent={self.user_agent}")
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement de l'URL
browser.get(url)
if not wait_element == "":
# On attend que le navigateur ait passé les tests anti-robots et que le contenu s'affiche
WebDriverWait(browser, 120).until(expected_conditions.presence_of_element_located((By.ID, wait_element)))
page_content = browser.page_source
# On récupère les cookies du navigateur pour les réutiliser plus tard
for cookie in browser.get_cookies():
self.session.cookies.set(cookie['name'], cookie['value'])
# On arrête le navigateur
browser.quit()
display.stop()
return page_content
def print_output(self, data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(self.output_file_path,'a')
f.write(data+"\n")
f.close()
def get_page(self, url, method, data={}):
try:
logger.debug(f'Chargement de la page {url}')
if self.sleep_time > 0:
time.sleep(self.sleep_time)
page = None
if method == 'get':
page = self.session.get(url)
if method == 'post':
page = self.session.post(url, data=data)
if page.status_code == 429:
logger.info(f'Erreur 429 Too Many Requests reçue, temporisation...')
self.tor_get_new_id()
time.sleep(55)
return self.get_page(url, method, data)
if self.tor_enabled:
self.tor_requests+=1
if self.tor_max_requests>0 and self.tor_requests>self.tor_max_requests:
self.tor_get_new_id()
return page
except requests.exceptions.ConnectionError as exc:
logger.info(f'Erreur de connexion, temporisation...')
self.tor_get_new_id()
time.sleep(55)
return self.get_page(url, method, data)
def update_user_agent(self, user_agent):
self.user_agent = user_agent
self.session.headers.update({'User-Agent': self.user_agent})
def download_file(self, raa):
try:
os.makedirs(os.path.dirname(f'{self.data_dir}{raa.get_sha256()}.pdf'), exist_ok=True)
file = self.get_page(raa.url, 'get')
f = open(f'{self.data_dir}{raa.get_sha256()}.pdf','wb')
f.write(file.content)
f.close()
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError):
logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...')
self.download_file(raa)
except Exception as exc:
logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}')
def parse_pdf(self, raa, keywords):
if not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.pdf'):
logger.warning(f'ATTENTION: le fichier {raa.get_sha256()}.pdf n\'existe pas')
else:
text = extract_text(f'{self.data_dir}{raa.get_sha256()}.pdf')
found = False
found_keywords = []
for keyword in keywords:
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
url = quote(raa.url, safe='/:')
self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date_str})')
self.print_output(f'URL : {url}')
found = True
self.found = True
self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
found_keywords.append(keyword)
# Écrit le texte du PDF dans un fichier texte pour une analyse future, puis supprime le PDF
f = open(f'{self.data_dir}{raa.get_sha256()}.txt','w')
f.write(text)
f.close()
os.remove(f'{self.data_dir}{raa.get_sha256()}.pdf')
if found:
self.print_output('')
url = quote(raa.url, safe='/:')
found_keywords_str = ', '.join([str(x) for x in found_keywords])
self.mastodon_toot(f"{raa.name} ({raa.date_str})\n\nLes termes suivants ont été trouvés : {found_keywords_str}.\n\nURL : {url}")
def ocr(self, raa, retry_on_failure=True):
cmd = [
'ocrmypdf',
'-l', 'eng+fra',
'--output-type', 'pdfa',
'--redo-ocr',
'--skip-big', '500',
'--invalidate-digital-signatures',
f'{self.data_dir}{raa.get_sha256()}.pdf',
f'{self.data_dir}{raa.get_sha256()}.pdf'
]
logger.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
if self.tor_enabled:
self.tor_get_new_id()
self.download_file(raa)
self.ocr(raa,False)
elif (not exc.returncode == 6) and (not exc.returncode == 10):
logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
def parse_raa(self, elements, keywords):
for raa in elements:
# Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la date maximale d'analyse,
# on le télécharge et on le parse
if (raa.date >= self.not_before) and (not os.path.isfile(f'{self.data_dir}{raa.get_sha256()}.txt')):
url = quote(raa.url, safe='/:')
logger.info(f'Nouveau fichier : {raa.name} ({raa.date_str}). URL : {url}')
self.download_file(raa)
self.ocr(raa, True)
self.parse_pdf(raa, keywords)
def get_raa(self, page_content):
logger.error('Cette fonction doit être surchargée')
def configure_mailer(self, smtp_host, smtp_username, smtp_password,
smtp_port, smtp_starttls, smtp_ssl, email_from,
email_to, email_object):
self.smtp_host = smtp_host
self.smtp_username = smtp_username
self.smtp_password = smtp_password
if smtp_port <= 0:
self.smtp_port = 587
else:
self.smtp_port = int(smtp_port)
self.smtp_starttls = smtp_starttls
self.smtp_ssl = smtp_ssl
self.email_from = email_from
self.email_to = email_to
self.email_object = email_object
if smtp_host and smtp_username and smtp_password and email_from and email_to and email_object:
self.smtp_configured = True
def mailer(self):
if self.smtp_configured and self.found:
try:
message = EmailMessage()
message.set_content(open(self.output_file_path).read())
message['Subject'] = self.email_object
message['From'] = self.email_from
context = ssl.create_default_context()
if self.smtp_ssl == True:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP_SSL(self.smtp_host, port, context=context)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
elif self.smtp_starttls == True:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP(self.smtp_host)
smtp.starttls(context=context)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
else:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP(self.smtp_host)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
except Exception as exc:
logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')
# Fonction qui essaie de deviner la date d'un RAA à partir de son nom.
# Utile pour limiter les requêtes lors de l'obtention des RAA à scanner.
def guess_date(string, regex):
try:
search = re.search(regex, string, re.IGNORECASE)
guessed_date = dateparser.parse(search.group(1))
if guessed_date == None:
raise Exception('La date est un objet None')
else:
return guessed_date
except Exception as exc:
logger.warning(f"Impossible de deviner la date du terme {string} : {exc}")
return datetime.datetime(9999, 1, 1)