Newer
Older
import random
import string
import logging
import requests
import time
import datetime
import json
from urllib.parse import quote
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
import dateparser
from bs4 import BeautifulSoup
from pyvirtualdisplay import Display
from pypdf import PdfReader
from pypdf import PdfWriter
from pypdf.generic import NameObject, NumberObject
from pypdf.errors import PdfStreamError
import hashlib
import smtplib
import email
from mastodon import Mastodon
import ftfy
logger = logging.getLogger(__name__)
class Attrap:
class RAA:
"""La classe représentant un Recueil des actes administratifs. La plupart du temps, il s'agit d'un PDF avec plusieurs arrêtés."""
date = None
date_str = ""
name = ""
sha256 = ""
pdf_creation_date = None
pdf_modification_date = None
def __init__(self, url, date, name):
if not url == "":
self.url = url
if date is not None:
self.date = Attrap.get_aware_datetime(date)
self.date_str = date.strftime("%d/%m/%Y")
if not name == "":
self.name = name
def get_sha256(self):
"""Calcule et met en cache le hash sha256 de l'URL du PDF, pour servir d'identifiant unique."""
if (self.sha256 == ""):
self.sha256 = hashlib.sha256(self.url.encode('utf-8')).hexdigest()
return self.sha256
def get_pdf_dates(self, data_dir):
"""Extrait les dates des PDF pour les ajouter à l'objet du RAA."""
raa_data_dir = f'{data_dir}/raa/'
reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.pdf')
pdf_metadata = reader.metadata
if pdf_metadata:
if pdf_metadata.creation_date:
self.pdf_creation_date = Attrap.get_aware_datetime(pdf_metadata.creation_date)
if self.date is None:
self.date = Attrap.get_aware_datetime(pdf_metadata.creation_date)
self.date_str = self.date.strftime("%d/%m/%Y")
if pdf_metadata.modification_date:
self.pdf_modification_date = Attrap.get_aware_datetime(pdf_metadata.modification_date)
if self.date is None:
self.date = Attrap.get_aware_datetime(pdf_metadata.modification_date)
self.date_str = self.date.strftime("%d/%m/%Y")
"""Extrait le contenu du PDF OCRisé pour l'écrire dans le fichier qui servira à faire la recherche de mots-clés. Supprime tous les PDF à la fin."""
raa_data_dir = f'{data_dir}/raa/'
text = ""
reader = PdfReader(f'{raa_data_dir}{self.get_sha256()}.ocr.pdf')
ftfy_config = ftfy.TextFixerConfig(unescape_html=False, explain=False)
text = text + "\n" + ftfy.fix_text(page.extract_text(), config=ftfy_config)
except Exception as exc:
logger.warning(f'ATTENTION: Impossible d\'extraire le texte du fichier {self.get_sha256()}.pdf : {exc}')
# Écrit le texte du PDF dans un fichier texte pour une analyse future
f = open(f'{raa_data_dir}{self.get_sha256()}.txt', 'w')
f.write(text)
f.close()
# Supprime le PDF d'origine et la version OCRisée
os.remove(f'{raa_data_dir}{self.get_sha256()}.pdf')
os.remove(f'{raa_data_dir}{self.get_sha256()}.ocr.pdf')
os.remove(f'{raa_data_dir}{self.get_sha256()}.flat.pdf')
def write_properties(self, data_dir):
"""Écris les propriétés du RAA dans un fichier JSON."""
raa_data_dir = f'{data_dir}/raa/'
pdf_creation_date_json = None
pdf_modification_date_json = None
if self.pdf_creation_date:
pdf_creation_date_json = self.pdf_creation_date.strftime("%d/%m/%Y %H:%M:%S%z")
pdf_modification_date_json = self.pdf_modification_date.strftime("%d/%m/%Y %H:%M:%S%z")
properties = {
'name': self.name,
'date': self.date_str,
'url': quote(self.url, safe='/:'),
'first_saw_on': datetime.datetime.now(datetime.timezone.utc).strftime("%d/%m/%Y %H:%M:%S%z"),
'pdf_creation_date': pdf_creation_date_json,
'pdf_modification_date': pdf_modification_date_json
}
f = open(f'{raa_data_dir}{self.get_sha256()}.json', 'w')
f.write(json.dumps(properties))
f.close()
def parse_metadata(self, data_dir):
"""Lance l'extraction des dates du PDF puis l'écriture de ses propriétés dans un fichier JSON."""
self.get_pdf_dates(data_dir)
self.write_properties(data_dir)
def __init__(self, data_dir, user_agent=''):
logger.debug('Initialisation de Attrap')
# On crée le dossier de téléchargement
os.makedirs(data_dir, exist_ok=True)
self.session = requests.Session()
self.data_dir = data_dir
self.found = False
self.output_file_path = os.path.dirname(os.path.abspath(__file__)) + f'/output_{self.short_code}.log'
self.sleep_time = 0
self.tor_enabled = False
self.tor_max_requests = 0
self.tor_requests = 0
self.tor_socks5_key = None
self.not_before = datetime.datetime(2024, 1, 1)
self.smtp_configured = False
self.mastodon = None
self.mastodon_prefix = ''
self.mastodon_suffix = ''
self.update_user_agent(user_agent)
f = open(self.output_file_path, 'w')
f.write('')
f.close()
self.print_output(str(self.__class__.__name__))
# Si le safe mode est activé, on configure un long délai entre chaque requête
if os.getenv('SAFE_MODE'):
self.safe_mode = True
logger.warning('ATTENTION: le safe mode est activé, configuration d\'un délai entre chaque requête')
self.sleep_time = 30
def configure_mastodon(self, access_token, instance, mastodon_prefix, mastodon_suffix):
"""Configuration de Mastodon afin de publier un toot à chaque RAA détecté."""
if access_token and access_token != "" and instance and instance != "":
self.mastodon = Mastodon(
access_token=access_token,
api_base_url=instance
)
self.mastodon_prefix = mastodon_prefix
self.mastodon_suffix = mastodon_suffix
def mastodon_toot(self, content):
"""Publie le toot en ajoutant le header et footer configurés."""
if self.mastodon:
toot = content
if not self.mastodon_prefix == '':
toot = f"{self.mastodon_prefix}\n\n{toot}"
if not self.mastodon_suffix == '':
toot = f"{toot}\n\n{self.mastodon_suffix}"
self.mastodon.toot(toot)
def enable_tor(self, max_requests=0):
"""Active l'utilisation de Tor pour effectuer les requêtes."""
if not self.safe_mode:
self.tor_enabled = True
self.tor_max_requests = max_requests
self.tor_get_new_id()
else:
logger.warning('ATTENTION: le safe mode est activé, Tor n\'a pas été activé')
"""Désactive l'utilisation de Tor."""
proxies = {}
self.tor_enabled = False
self.tor_max_requests = 0
self.tor_requests = 0
self.session.proxies.update(proxies)
def tor_get_new_id(self):
"""Change de circuit Tor. Cela permet de changer de noeud de sortie donc d'IP."""
self.tor_socks5_key = 'attrap_' + ''.join(random.choices(string.ascii_lowercase, k=20))
proxies = {
"http": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050",
"https": f"socks5h://attrap:{self.tor_socks5_key}@127.0.0.1:9050",
}
self.session.proxies.update(proxies)
self.tor_requests = 0
def get_sub_pages(self, page_content, element, host, recursive_until_pdf):
"""
Récupère, à partir d'un chemin CSS, les sous-pages d'une page.
page_content -- Un contenu HTML à analyser
element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée
host -- Le nom d'hôte du site
recursive_until_pdf -- Un booléen pour savoir s'il faut rechercher un fichier PDF dans le chemin CSS. Le cas échéant, relance la recherche sur la sous-page si le lien n'est pas un PDF.
"""
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
soup = BeautifulSoup(page_content, 'html.parser')
sub_pages = []
for a in soup.select(element):
if a.get('href'):
url = f"{host}{a['href']}"
if recursive_until_pdf:
sub_page_content = self.get_page(url, 'get').content
if not self.has_pdf(sub_page_content):
logger.info(
f'{url} ne contient pas de PDF, on récupère ses sous-pages'
)
for sub_sub_page in self.get_sub_pages(
sub_page_content,
element,
host,
recursive_until_pdf
):
sub_pages.append(sub_sub_page)
else:
sub_page = {
'url': url,
'name': a.get_text().strip()
}
sub_pages.append(sub_page)
else:
sub_page = {
'url': url,
'name': a.get_text().strip()
}
sub_pages.append(sub_page)
return sub_pages
def get_sub_pages_with_pager(self, page, sub_page_element, pager_element, details_element, host):
"""
Récupère, à partir d'un chemin CSS, les sous-pages d'une page contenant un pager.
page -- L'URL de la page à analyser
sub_page_element -- Le chemin CSS vers l'objet renvoyant vers la sous-page recherchée
pager_element -- Le chemin CSS vers le lien de page suivante du pager
details_element -- Le chemin CSS vers l'objet contenant les détails de la sous-page recherchée
host -- Le nom d'hôte du site
"""
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
pages = []
page_content = self.get_page(page, 'get').content
# On initialise le parser
soup = BeautifulSoup(page_content, 'html.parser')
# On recherche les sous-pages
sub_pages = soup.select(sub_page_element)
sub_pages_details = None
if details_element is not None:
sub_pages_details = soup.select(details_element)
i = 0
for sub_page in sub_pages:
if sub_page.get('href'):
page = {
'url': f"{host}{sub_page['href']}",
'name': sub_page.get_text().strip(),
'details': ''
}
if details_element is not None:
page['details'] = sub_pages_details[i].get_text().strip()
pages.append(page)
i = i + 1
# On recherche un pager, et si on le trouve on le suit
pager = soup.select(pager_element)
if pager and pager[0] and pager[0].get('href'):
for sub_page in self.get_sub_pages_with_pager(
f"{host}{pager[0]['href']}",
sub_page_element,
pager_element,
details_element,
host
):
pages.append(sub_page)
return pages

Bastien Le Querrec
a validé
def get_raa_with_pager(self, pages_list, pager_element, host, filter_from_last_element_date=False):
"""
Récupère et analyse les RAA d'une page contenant un pager.
pages_list -- Un tableau contenant la liste des pages
pager_element -- Le chemin CSS vers le lien de page suivante du pager
host -- Le nom d'hôte du site

Bastien Le Querrec
a validé
filter_from_last_element_date -- (Optionnel) Si la date du dernier élément de la dernière page parsée
n'est pas dans la plage temporelle voulue, ne charge pas les pages suivantes. Par défaut à False. Ne doit
être activé que si l'ordre des éléments est chronologique.
elements = []
# On parse chaque page passée en paramètre
for page in pages_list:
page_content = self.get_page(page, 'get').content
# Pour chaque page, on récupère les PDF
for raa in self.get_raa_elements(page_content):
elements.append(raa)

Bastien Le Querrec
a validé
# Si la date du dernier RAA est dans la plage temporelle voulue,
# on regarde également s'il n'y aurait pas un pager
if not filter_from_last_element_date or (filter_from_last_element_date and (elements[-1].date >= Attrap.get_aware_datetime(self.not_before))):
sub_pages = []
for sub_page in self.get_sub_pages(
page_content,
pager_element,
host,
True
):
sub_pages.append(sub_page['url'])
for sub_raa in self.get_raa_with_pager(
sub_pages,
pager_element,

Bastien Le Querrec
a validé
host,
filter_from_last_element_date=filter_from_last_element_date

Bastien Le Querrec
a validé
):
elements.append(sub_raa)
return elements
def set_sleep_time(self, sleep_time):
"""Configure le temps de temporisation"""
self.sleep_time = sleep_time
def has_pdf(self, page_content):
"""
Renvoie un booléen Vrai si la page contient un lien vers un PDF
page_content -- Un contenu HTML à analyser
"""
elements = []
soup = BeautifulSoup(page_content, 'html.parser')
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
return True
return False
# On démarre le navigateur
def get_session(self, url, wait_element, remaining_retries=0):
"""
Lance un navigateur avec Selenium.
url -- URL à interroger
wait_element -- Élement (désigné par son identifiant CSS) qui indique que la page est chargée
remaining_retries -- Nombre d'échecs autorisé avant de soulever une erreur
"""
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
if not self.user_agent == "":
webdriver_options.add_argument(f"--user-agent={self.user_agent}")
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement de l'URL
browser.get(url)
if wait_element is not None:
# On attend que le navigateur ait passé les tests anti-robots et
# que le contenu s'affiche
try:
WebDriverWait(browser, 60).until(
expected_conditions.presence_of_element_located(
(
By.ID,
wait_element
)
)
)
except TimeoutException as exc:
logger.warning(f'TimeoutException: {exc}')
if remaining_retries > 0:
time.sleep(5)
return self.get_session(url, wait_element, (remaining_retries - 1))
else:
raise TimeoutException(exc)
page_content = browser.page_source
# On récupère les cookies du navigateur pour les réutiliser plus tard
for cookie in browser.get_cookies():
self.session.cookies.set(cookie['name'], cookie['value'])
# On arrête le navigateur
browser.quit()
display.stop()
return page_content
def print_output(self, data):
"""Affiche dans le terminal et dans le fichier de log un texte"""
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(self.output_file_path, 'a')
f.write(data + "\n")
f.close()
def get_page(self, url, method, data={}):
"""
Récupère le contenu HTML d'une page web
url -- L'URL de la page demandée
method -- 'post' ou 'get', selon le type de requête
data -- Un dictionnaire contenant les données à envoyer au site
"""
try:
logger.debug(f'Chargement de la page {url}')
if self.sleep_time > 0:
time.sleep(self.sleep_time)
page = None
if method == 'get':
page = self.session.get(url, timeout=(10, 120))
if method == 'post':
page = self.session.post(url, data=data, timeout=(10, 120))
if page.status_code == 429:
logger.warning('Erreur 429 Too Many Requests reçue, temporisation...')
self.tor_get_new_id()
time.sleep(1)
return self.get_page(url, method, data)
if self.tor_enabled:
self.tor_requests += 1
if self.tor_max_requests > 0 and \
self.tor_requests > self.tor_max_requests:
self.tor_get_new_id()
return page
except requests.exceptions.ConnectionError:
logger.warning(f'Erreur de connexion, temporisation...')
self.tor_get_new_id()
time.sleep(30)
return self.get_page(url, method, data)
except requests.exceptions.Timeout:
logger.warning(f'Timeout, on relance la requête...')
return self.get_page(url, method, data)
def update_user_agent(self, user_agent):
"""Change la valeur du user-agent"""
self.user_agent = user_agent
self.session.headers.update({'User-Agent': self.user_agent})
def download_file(self, raa):
try:
os.makedirs(
os.path.dirname(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf'),
exist_ok=True
)
file = self.get_page(raa.url, 'get')
f = open(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', 'wb')
f.write(file.content)
f.close()
except (requests.exceptions.ConnectionError,
requests.exceptions.ChunkedEncodingError):
logger.warning(f'ATTENTION: la connexion a été interrompue pendant le téléchargement de {raa.url}, nouvelle tentative...')
self.download_file(raa)
except Exception as exc:
logger.warning(f'ATTENTION: Impossible de télécharger le fichier {raa.url}: {exc}')
def ocr(self, raa, retry_on_failure=True):
'-l', 'eng+fra',
'--output-type', 'pdf',
'--redo-ocr',

Bastien Le Querrec
a validé
'--skip-big', '250',
'--max-image-mpixels', '250',
'--invalidate-digital-signatures',
'--optimize', '0',
f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf',
f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf'
]
logger.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if exc.returncode == 2 and retry_on_failure:
logger.warning('ATTENTION : Le fichier n\'est pas un PDF correct, nouvelle tentative de le télécharger')
if self.tor_enabled:
self.tor_get_new_id()
self.download_file(raa)
self.ocr(raa, False)
elif (not exc.returncode == 6) and (not exc.returncode == 10) and (not exc.returncode == 4):
logger.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
shutil.copy(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf', f'{self.data_dir}/raa/{raa.get_sha256()}.ocr.pdf')
def flatten_pdf(self, raa):
"""Supprime les formulaires d'un PDF pour pouvoir les OCRiser après dans OCRmyPDF."""
reader = PdfReader(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf')
writer = PdfWriter()
for page in reader.pages:
if page.get('/Annots'):
for annot in page.get('/Annots'):
writer_annot = annot.get_object()
writer_annot.update({
NameObject("/Ff"): NumberObject(1)
})
writer.add_page(page)
writer.write(f'{self.data_dir}/raa/{raa.get_sha256()}.flat.pdf')
def search_keywords(self, raa, keywords):
"""Recherche des mots-clés dans le texte extrait du PDF"""
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
if keywords and not keywords == '':
text = open(f'{self.data_dir}/raa/{raa.get_sha256()}.txt').read()
found = False
found_keywords = []
for keyword in keywords.split(','):
if re.search(keyword, text, re.IGNORECASE | re.MULTILINE):
if not found:
url = quote(raa.url, safe='/:')
self.print_output(f'\033[92m{raa.name}\033[0m ({raa.date_str})')
self.print_output(f'URL : {url}')
found = True
self.found = True
self.print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
found_keywords.append(keyword)
if found:
self.print_output('')
url = quote(raa.url, safe='/:')
found_keywords_str = ', '.join(
[str(x) for x in found_keywords]
)
self.mastodon_toot(
f'{raa.name} ({raa.date_str})\n\nLes termes suivants ont '
f'été trouvés : {found_keywords_str}.\n\nURL : {url}'
)
def parse_raa(self, elements, keywords):
"""
Démarre l'analyse des RAA.
elements -- Un tableau contenant les RAA à analyser
keywords -- Les mots-clés à rechercher dans chaque RAA
"""
self.print_output(f'Termes recherchés: {keywords}')
self.print_output('')
for raa in elements:
# Si le fichier n'a pas déjà été parsé et qu'il est postérieur à la
# date maximale d'analyse, on le télécharge et on le parse
if not os.path.isfile(f'{self.data_dir}/raa/{raa.get_sha256()}.txt') and (not raa.date or (raa.date >= Attrap.get_aware_datetime(self.not_before))):
url = quote(raa.url, safe='/:')
self.download_file(raa)
try:
raa.parse_metadata(self.data_dir)
# Lorsque la date du RAA n'est pas connue, on a dû télécharger le PDF pour récupérer la date de ses métadonnées.
# Donc on vérifie à nouveau ici si la date correspond à ce qu'on veut analyser
if (raa.date and raa.date >= Attrap.get_aware_datetime(self.not_before)):
logger.info(f'Nouveau fichier : {raa.name} ({raa.date_str}). URL : {url}')
self.flatten_pdf(raa)
self.ocr(raa, True)
raa.extract_content(self.data_dir)
self.search_keywords(raa, keywords)
else:
os.remove(f'{self.data_dir}/raa/{raa.get_sha256()}.pdf')
logger.error(f'ERREUR: le RAA {raa.name} n\'a pas de date !')
sys.exit(1)
except PdfStreamError as exc:
logger.warning(f'ATTENTION: le RAA à l\'adresse {raa.url} n\'est pas valide ! On l\'ignore...')
def get_raa(self, page_content):
logger.error('Cette fonction doit être surchargée')
def configure_mailer(self, smtp_host, smtp_username, smtp_password,
smtp_port, smtp_starttls, smtp_ssl, email_from,
email_to, email_object):
"""
Configure les courriers électroniques.
smtp_host -- Nom d'hôte du serveur SMTP
smtp_username -- Nom d'utilisateur pour se connecter au serveur SMTP
smtp_password -- Mot de passe pour se connecter au serveur SMTP
smtp_port -- Port de connexion au serveur SMTP
smtp_starttls -- Booléen. Si vrai, se connecte avec STARTTLS
smtp_ssl -- Booléen. Si vrai, se connecte avec SSL/TLS
email_from -- Adresse électronique de l'expéditeur des notifications
email_to -- Tableau contenant les adresses électroniques à notifier
email_object -- Objet du courrier électronique de notification
"""
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
self.smtp_host = smtp_host
self.smtp_username = smtp_username
self.smtp_password = smtp_password
if smtp_port <= 0:
self.smtp_port = 587
else:
self.smtp_port = int(smtp_port)
self.smtp_starttls = smtp_starttls
self.smtp_ssl = smtp_ssl
self.email_from = email_from
self.email_to = email_to
self.email_object = email_object
if smtp_host and smtp_username and smtp_password and email_from and email_to and email_object:
self.smtp_configured = True
def mailer(self):
if self.smtp_configured and self.found:
try:
message = email.message.EmailMessage()
message.set_content(open(self.output_file_path).read())
message['Subject'] = self.email_object
message['From'] = self.email_from
message['Message-ID'] = email.utils.make_msgid(domain=self.email_from.split('@')[-1])
message['Date'] = email.utils.formatdate()
context = ssl.create_default_context()
if self.smtp_ssl is True:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP_SSL(self.smtp_host, port, context=context)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
elif self.smtp_starttls is True:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP(self.smtp_host)
smtp.starttls(context=context)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
else:
for address in self.email_to.split(','):
del message['To']
message['To'] = address
smtp = smtplib.SMTP(self.smtp_host)
if self.smtp_username:
smtp.login(self.smtp_username, self.smtp_password)
smtp.send_message(message)
smtp.quit()
except Exception as exc:
logger.warning(f'Impossible d\'envoyer le courrier électronique : {exc}')
def guess_date(string, regex):
"""
Essaie de deviner la date d'un RAA à partir de son nom.
Utile pour limiter les requêtes lors de l'obtention des RAA à scanner.
"""
try:
search = re.search(regex, string, re.IGNORECASE)
guessed_date = dateparser.parse(search.group(1), languages=['fr'])
if guessed_date is None:
raise Exception('La date est un objet None')
else:
return guessed_date
except Exception as exc:
logger.warning(f'Impossible de deviner la date du terme {string} : {exc}')
return datetime.datetime(9999, 1, 1)
def get_aware_datetime(unknown_datetime):
"""
Retourne un objet datetime avisé.
datetime - L'objet datetime à aviser. Utilise le fuseau horaire du système si datetime est naïf.
"""
if unknown_datetime.tzinfo is not None and unknown_datetime.tzinfo.utcoffset(unknown_datetime) is not None:
return unknown_datetime
else:
return unknown_datetime.replace(tzinfo=datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo)