Skip to content
Extraits de code Groupes Projets

Comparer les révisions

Les modifications sont affichées comme si la révision source était fusionnée avec la révision cible. En savoir plus sur la comparaison des révisions.

Source

Sélectionner le projet cible
No results found

Cible

Sélectionner le projet cible
  • la-quadrature-du-net/Attrap
  • foggyfrog/Attrap
  • skhwiz/Attrap
  • precambrien/Attrap
  • ketsapiwiq/Attrap
  • Joseki/Attrap
  • kr1p/attrap-pref-12
  • kr1p/attrap-pref-46
  • kr1p/attrap-pi
  • Guinness/Attrap
  • astroidgritty/attrap-pref-84
  • davinov/Attrap
  • maettellite/attrap-pref-01
  • m242/Attrap
  • multi/Attrap
  • mverdeil/Attrap
  • olpo/Attrap
17 résultats
Afficher les modifications
#!/usr/bin/env python3
import os
import argparse
import logging
import datetime
import dateparser
import importlib
from Attrap import Attrap
# Config
__KEYWORDS = os.getenv('KEYWORDS') or ''
__DATA_DIR_ROOT = os.path.dirname(os.path.abspath(__file__)) + '/data/'
__SMTP_HOSTNAME = os.getenv('SMTP_HOSTNAME') or 'localhost'
__SMTP_USERNAME = os.getenv('SMTP_USERNAME') or ''
__SMTP_PASSWORD = os.getenv('SMTP_PASSWORD') or ''
__EMAIL_FROM = os.getenv('EMAIL_FROM')
__EMAIL_TO = os.getenv('EMAIL_TO')
if os.getenv('SMTP_PORT'):
__SMTP_PORT = int(os.getenv('SMTP_PORT'))
else:
__SMTP_PORT = 587
if os.getenv('SMTP_STARTTLS'):
__SMTP_STARTTLS = True
else:
__SMTP_STARTTLS = False
if os.getenv('SMTP_SSL'):
__SMTP_SSL = True
else:
__SMTP_SSL = False
if os.getenv('NOT_BEFORE'):
try:
relative_date = dateparser.parse(os.getenv('NOT_BEFORE'))
__NOT_BEFORE = datetime.datetime(year=relative_date.year, month=relative_date.month, day=relative_date.day)
except Exception as exc:
__NOT_BEFORE = datetime.datetime.strptime(
os.getenv('NOT_BEFORE'), '%Y-%m-%d'
)
else:
__NOT_BEFORE = datetime.datetime(2024, 1, 1)
__MASTODON_ACCESS_TOKEN = os.getenv('MASTODON_ACCESS_TOKEN')
__MASTODON_INSTANCE = os.getenv('MASTODON_INSTANCE')
# Liste des administrations supportées
available_administrations = [
'ppparis',
'pref01',
'pref02',
'pref03',
'pref04',
'pref05',
'pref06',
'pref09',
'pref10',
'pref11',
'pref13',
'pref2a',
'pref2b',
'pref25',
'pref29',
'pref30',
'pref31',
'pref33',
'pref34',
'pref35',
'pref38',
'pref39',
'pref42',
'pref44',
'pref49',
'pref50',
'pref52',
'pref54',
'pref55',
'pref59',
'pref61',
'pref62',
'pref63',
'pref64',
'pref65',
'pref66',
'pref69',
'pref73',
'pref75',
'pref76',
'pref77',
'pref80',
'pref81',
'pref83',
'pref87',
'pref91',
'pref92',
'pref93',
'pref94',
'pref976',
'prefbretagne',
'prefidf',
'prefpaca'
]
# Début du script
parser = argparse.ArgumentParser(
prog='cli.py',
description='Télécharge les RAA d\'une administration donnée et recherche des mots-clés'
)
parser.add_argument(
'administration',
action='store',
help='identifiant de l\'administration',
choices=available_administrations
)
parser.add_argument(
'-k',
'--keywords',
action='store',
help='liste des termes recherchés, séparés par une virgule (aucun par défaut)'
)
parser.add_argument(
'--not-before',
action='store',
help='n\'analyse pas les RAA datant d\'avant la date indiquée, au format YYYY-MM-DD (par défaut : 2024-01-01)'
)
parser.add_argument(
'--smtp-hostname',
action='store',
help='nom d\'hôte SMTP (par défaut : localhost)'
)
parser.add_argument(
'--smtp-username',
action='store',
help='nom d\'utilisateur SMTP (par défaut : vide)'
)
parser.add_argument(
'--smtp-password',
action='store',
help='mot de passe SMTP (par défaut : vide)'
)
parser.add_argument(
'--smtp-port',
action='store',
help='port SMTP (par défaut : 587)'
)
parser.add_argument(
'--smtp-starttls',
action='store_true',
help='connexion SMTP avec STARTTLS'
)
parser.add_argument(
'--smtp-ssl',
action='store_true',
help='connexion SMTP avec SSL'
)
parser.add_argument(
'-f',
'--email-from',
action='store',
help='adresse de courrier électronique expéditrice des notifications'
)
parser.add_argument(
'-t',
'--email-to',
action='store',
help='adresses de courriers électroniques destinataires des notifications (séparées par une virgule)'
)
for administration in available_administrations:
parser.add_argument(
f'--{administration}-email-to',
action='store',
help=f'adresses de courrier électronique destinataires des notifications (séparées par une virgule) uniquement si l\'analyse concerne {administration} (s\'ajoute à celles précisées dans --email-to)'
)
parser.add_argument(
'--mastodon-access-token',
action='store',
help='jeton d\'accès pour publier sur Mastodon (par défaut : vide)'
)
parser.add_argument(
'--mastodon-instance',
action='store',
help='URL de l\'instance (doit inclure "http://" ou "https://" ; par défaut : vide)'
)
parser.add_argument(
'-v',
action='store_true',
help='relève le niveau de verbosité à INFO'
)
parser.add_argument(
'-vv',
action='store_true',
help='relève le niveau de verbosité à DEBUG'
)
args = parser.parse_args()
if (args.v or os.getenv('VERBOSE')) and not args.vv and not os.getenv('VVERBOSE'):
logging.basicConfig(level=logging.INFO)
logging.getLogger("stem").setLevel(logging.WARNING)
if args.vv or os.getenv('VVERBOSE'):
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("stem").setLevel(logging.WARNING)
if args.keywords:
__KEYWORDS = args.keywords
if args.not_before:
try:
relative_date = dateparser.parse(args.not_before)
__NOT_BEFORE = datetime.datetime(year=relative_date.year, month=relative_date.month, day=relative_date.day)
except Exception as exc:
__NOT_BEFORE = datetime.datetime.strptime(args.not_before, '%Y-%m-%d')
if args.smtp_hostname:
__SMTP_HOSTNAME = args.smtp_hostname
if args.smtp_username:
__SMTP_USERNAME = args.smtp_username
if args.smtp_password:
__SMTP_PASSWORD = args.smtp_password
if args.smtp_port:
__SMTP_PORT = int(args.smtp_port)
if args.smtp_starttls:
__SMTP_STARTTLS = True
if args.smtp_ssl:
__SMTP_SSL = True
if args.email_from:
__EMAIL_FROM = args.email_from
if args.email_to:
__EMAIL_TO = args.email_to
if args.mastodon_access_token:
__MASTODON_ACCESS_TOKEN = args.mastodon_access_token
if args.mastodon_instance:
__MASTODON_INSTANCE = args.mastodon_instance
__DATA_DIR = f'{__DATA_DIR_ROOT}{args.administration}/'
# On calcule la liste des mails à notifier (liste générale EMAIL_TO + liste
# administration EMAIL_TO_ADMINISTRATION**)
__ADMINISTRATION_EMAIL_TO = ''
administration_var_name = f'{args.administration}_EMAIL_TO'.upper()
if os.getenv(administration_var_name):
__ADMINISTRATION_EMAIL_TO = os.getenv(administration_var_name)
else:
for arg in vars(args).items():
if arg[0] == f'{args.administration}_email_to':
__ADMINISTRATION_EMAIL_TO = arg[1]
if __ADMINISTRATION_EMAIL_TO and not __ADMINISTRATION_EMAIL_TO == '':
if __EMAIL_TO:
__EMAIL_TO = f'{__EMAIL_TO},{__ADMINISTRATION_EMAIL_TO}'
else:
__EMAIL_TO = __ADMINISTRATION_EMAIL_TO
module = importlib.import_module(f'Attrap_{args.administration}')
attrap = getattr(module, f'Attrap_{args.administration}')(__DATA_DIR)
attrap.not_before = __NOT_BEFORE
attrap.configure_mailer(__SMTP_HOSTNAME, __SMTP_USERNAME, __SMTP_PASSWORD, __SMTP_PORT, __SMTP_STARTTLS, __SMTP_SSL,
__EMAIL_FROM, __EMAIL_TO, f'[Attrap] [{attrap.full_name}] Nouveaux éléments trouvés')
attrap.configure_mastodon(__MASTODON_ACCESS_TOKEN, __MASTODON_INSTANCE, f'[{attrap.full_name}]', f'#{attrap.short_code}')
attrap.get_raa(__KEYWORDS)
#!/usr/bin/env bash
set -e
s3_key="${1}"
s3_secret="${2}"
s3_host="${3}"
s3_bucket="${4}"
dest="${5}"
root_path=$(dirname $(realpath "${BASH_SOURCE[0]}"))
administrations="ppparis
pref01
pref02
pref03
pref04
pref05
pref06
pref07
pref08
pref09
pref10
pref11
pref12
pref13
pref14
pref15
pref16
pref17
pref18
pref19
pref2a
pref2b
pref21
pref22
pref23
pref24
pref25
pref26
pref27
pref28
pref29
pref30
pref31
pref32
pref33
pref34
pref35
pref36
pref37
pref38
pref39
pref40
pref41
pref42
pref43
pref44
pref45
pref46
pref47
pref49
pref49
pref50
pref51
pref52
pref53
pref54
pref55
pref56
pref57
pref58
pref59
pref60
pref61
pref62
pref63
pref64
pref65
pref66
pref67
pref68
pref69
pref70
pref71
pref72
pref73
pref74
pref75
pref76
pref77
pref78
pref79
pref80
pref81
pref82
pref83
pref84
pref85
pref86
pref87
pref88
pref89
pref90
pref91
pref92
pref93
pref94
pref95
pref971
pref972
pref973
pref974
pref976
prefidf
prefpaca"
if test -z "$s3_key" || test -z "$s3_secret" || test -z "$s3_host" || test -z "$s3_bucket" || test -z "$dest"; then
echo "Usage: ${0} <s3_key> <s3_secret> <s3_host> <s3_bucket> <dest>"
exit 1
fi
for i in $administrations; do
${root_path}/download-from-s3.sh "${i}" "$s3_key" "$s3_secret" "$s3_host" "$s3_bucket" "$dest" || true
rm "${dest}/${i}.zip" || true
done
#!/usr/bin/env bash
set -e
pref="${1}"
s3_key="${2}"
s3_secret="${3}"
s3_host="${4}"
s3_bucket="${5}"
dest="${6}"
if test -z "$pref" || test -z "$s3_key" || test -z "$s3_secret" || test -z "$s3_host" || test -z "$s3_bucket" || test -z "$dest"; then
echo "Usage: ${0} <pref> <s3_key> <s3_secret> <s3_host> <s3_bucket> <dest>"
exit 1
fi
dest=$(realpath "${dest}")
mkdir -p "${dest}/"
cd "${dest}/"
file="${pref}.zip"
echo "Downloading ${pref}..."
ressource="/${s3_bucket}/${file}"
content_type="application/octet-stream"
date=$(date --utc -R)
signature=$(echo -en "GET\n\n${content_type}\n${date}\n${ressource}" | openssl sha1 -hmac "${s3_secret}" -binary | base64)
curl -X GET \
--silent \
-H "Date: ${date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${signature}" \
"${s3_host}${ressource}" \
-o "${file}"
unzip -o "${file}" > /dev/null
rm "${file}"
#!/usr/bin/env python3
import argparse
import json
import os
import re
from urllib.parse import unquote
import hashlib
parser = argparse.ArgumentParser(
prog='./misc/fix-pref75-prefidf-url.py',
description='Met à jour les URL des RAA de Paris et d\'Idf'
)
parser.add_argument(
'--data-dir',
action='store',
help='dossier de données (par défaut: data/)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='ne modifie aucun fichier, affiche seulement les modifications nécessaires (par défaut: false)'
)
args = parser.parse_args()
if args.data_dir:
data_dir = args.data_dir
else:
data_dir = 'data/'
dry_run = args.dry_run
if data_dir.startswith('/'):
data_dir = os.path.abspath(data_dir)
else:
data_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + '/../' + data_dir)
for administration in os.listdir(data_dir):
# On ne cherche que les dossiers prefidf et pref75
if administration == 'prefidf' or administration == 'pref75':
administration_path = os.path.abspath(data_dir + '/' + administration + '/raa/')
for raa in os.listdir(administration_path):
if raa.endswith('.json'):
raa_id = re.sub('\\.json$', '', raa)
fixed = False
raa_path = os.path.abspath(administration_path + '/' + raa)
txt_path = re.sub('\\.json$', '.txt', raa_path)
raa_file_read = open(raa_path, 'r')
raa_json = json.load(raa_file_read)
raa_file_read.close()
url = raa_json.get('url')
if url.startswith('https://www.prefectures-regions.gouv.fr/ile-de-france/ile-de-france/ile-de-france/irecontenu/telechargement/'):
raa_json['url'] = url.replace('https://www.prefectures-regions.gouv.fr/ile-de-france/ile-de-france/ile-de-france/irecontenu/telechargement/', 'https://www.prefectures-regions.gouv.fr/ile-de-france/irecontenu/telechargement/')
fixed_raa_json = {}
for key in raa_json:
fixed_raa_json[key] = raa_json[key]
fixed_raa_id = hashlib.sha256(unquote(raa_json['url']).encode('utf-8')).hexdigest()
fixed_raa_path = raa_path.replace(raa_id, fixed_raa_id)
fixed_txt_path = txt_path.replace(raa_id, fixed_raa_id)
print(f'{raa_id} -> {fixed_raa_id}:')
print(f" {raa_json['url']}")
print('')
if not dry_run:
raa_file_write = open(fixed_raa_path, 'w')
raa_file_write.write(json.dumps(fixed_raa_json))
raa_file_write.close()
os.remove(raa_path)
os.rename(txt_path, fixed_txt_path)
#!/usr/bin/env python3
import argparse
import datetime
import io
import json
import os
import pytz
import sys
import re
import requests
sys.path.append(os.path.dirname(__file__) + '/../')
from Attrap_pref976 import Attrap_pref976
from Attrap import Attrap
from pypdf import PdfReader
tz_paris = pytz.timezone('Europe/Paris')
session = requests.Session()
def fix_raa_date_v0(raa_json):
try:
raa_json['date'] = tz_paris.localize(datetime.datetime.strptime(raa_json['date'], '%d/%m/%Y')).strftime('%Y-%m-%d')
return raa_json
except Exception:
print(f"\033[91m{exc=}, {type(exc)=}\033[0m")
sys.exit(1)
return raa_json
def fix_pdf_date_v0(raa_json, json_key):
if not raa_json[json_key]:
return raa_json
try:
# On tente de parser avec fuseau horaire
raa_json[json_key] = datetime.datetime.strptime(raa_json[json_key], '%d/%m/%Y %H:%M:%S%z').astimezone(pytz.utc).isoformat(timespec="seconds")
return raa_json
except Exception:
try:
# Sinon on tente de parser sans fuseau horaire et on retourne une date avec le fuseau de Paris
raa_json[json_key] = tz_paris.localize(datetime.datetime.strptime(raa_json[json_key], '%d/%m/%Y %H:%M:%S')).astimezone(pytz.utc).isoformat(timespec="seconds")
return raa_json
except Exception as exc:
print(f"\033[91m{exc=}, {type(exc)=}\033[0m")
sys.exit(1)
return raa_json
def fix_json(raa_json, raa_id, administration):
version = raa_json.get('version')
match version:
# Si le fichier de métadonnées n'a pas de version, il a été généré avant le 14/11/2024 et doit être corrigé
# v0 -> v1 : les dates sont au format YYYY-MM-DD et heure locale
# les heures sont au format YYYY-MM-DD HH:mm:ss±ZZ:ZZ et heure UTC
case None:
print(f"{raa_id}: v0 -> v1")
date = raa_json['date']
first_seen_on = raa_json['first_seen_on']
pdf_creation_date = raa_json['pdf_creation_date']
pdf_modification_date = raa_json['pdf_modification_date']
print(f"{administration}: {raa_json['name']} ({raa_id}):")
fixed_raa_json = fix_raa_date_v0(raa_json)
if not date == fixed_raa_json['date']:
print(f" date: {date} -> {fixed_raa_json['date']}")
fixed_raa_json = fix_pdf_date_v0(fixed_raa_json, 'first_seen_on')
if not raa_json == fixed_raa_json:
print(f" first_seen_on: {first_seen_on} -> {fixed_raa_json['first_seen_on']}")
fixed_raa_json = fix_pdf_date_v0(fixed_raa_json, 'pdf_creation_date')
if not pdf_creation_date == fixed_raa_json['pdf_creation_date']:
print(f" pdf_creation_date: {pdf_creation_date} -> {fixed_raa_json['pdf_creation_date']}")
fixed_raa_json = fix_pdf_date_v0(fixed_raa_json, 'pdf_modification_date')
if not pdf_modification_date == fixed_raa_json['pdf_modification_date']:
print(f" pdf_modification_date: {pdf_modification_date} -> {fixed_raa_json['pdf_modification_date']}")
ordered_fixed_raa_json = {'version': 1}
for key in fixed_raa_json:
ordered_fixed_raa_json[key] = fixed_raa_json[key]
fixed_raa_json = ordered_fixed_raa_json
return fix_json(fixed_raa_json, raa_id, administration)
# Si le fichier de métadonnées est en version 1, il ne contient pas le fuseau horaire de l'administration.
# v1 -> v2: ajout du fuseau horaire (Europe/Paris sauf Mayotte Indian/Mayotte)
case 1:
print(f"{raa_id}: v1 -> v2")
if administration == 'pref976':
print(' Téléchargement du RAA pour recalculer les dates avec le fuseau horaire de Mayotte')
# Il faut retélécharger le RAA pour vérifier que les heures sont dans le bon fuseau
session.headers.update({'User-Agent': Attrap_pref976.user_agent})
pdf_resource = session.get(raa_json['url'], timeout=(10, 120), stream=True)
pdf = io.BytesIO(pdf_resource.content)
reader = PdfReader(pdf)
pdf_metadata = reader.metadata
if pdf_metadata:
if pdf_metadata.creation_date:
pdf_creation_date = Attrap.get_aware_datetime(pdf_metadata.creation_date, timezone=Attrap_pref976.timezone)
raa_json['pdf_creation_date'] = pdf_creation_date.astimezone(pytz.utc).isoformat(timespec="seconds")
if pdf_metadata.modification_date:
pdf_modification_date = Attrap.get_aware_datetime(pdf_metadata.modification_date, timezone=Attrap_pref976.timezone)
raa_json['pdf_modification_date'] = pdf_modification_date.astimezone(pytz.utc).isoformat(timespec="seconds")
print(' Ajout du fuseau horaire')
if administration == 'pref976':
raa_json['timezone'] = Attrap_pref976.timezone
else:
raa_json['timezone'] = 'Europe/Paris'
raa_json['version'] = 2
return fix_json(raa_json, raa_id, administration)
case 2:
return raa_json
case _:
print(f'Version inconnue : {version}')
sys.exit(1)
parser = argparse.ArgumentParser(
prog='./misc/update_metadata_format.py',
description='Met à jour le format des fichiers de métadonnées'
)
parser.add_argument(
'--data-dir',
action='store',
help='dossier de données (par défaut: data/)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='ne modifie aucun fichier, affiche seulement les modifications nécessaires (par défaut: false)'
)
args = parser.parse_args()
if args.data_dir:
data_dir = args.data_dir
else:
data_dir = 'data/'
dry_run = args.dry_run
if data_dir.startswith('/'):
data_dir = os.path.abspath(data_dir)
else:
data_dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__)) + '/../' + data_dir)
for administration in os.listdir(data_dir):
# On ne cherche que les dossiers ppparis et pref*
if administration.startswith('pref') or administration == 'ppparis':
administration_path = os.path.abspath(data_dir + '/' + administration + '/raa/')
for raa in os.listdir(administration_path):
if raa.endswith('.json'):
raa_id = re.sub('\\.json$', '', raa)
fixed = False
raa_path = os.path.abspath(administration_path + '/' + raa)
raa_file_read = open(raa_path, 'r')
raa_json = json.load(raa_file_read)
raa_file_read.close()
version = raa_json.get('version')
if version != 2:
fixed_raa_json = fix_json(raa_json, raa_id, administration)
if not dry_run:
raa_file_write = open(raa_path, 'w')
raa_file_write.write(json.dumps(fixed_raa_json))
raa_file_write.close()
else:
print(f'On ignore {administration}...')
#!/usr/bin/env bash
set -e
s3_key="${1}"
s3_secret="${2}"
s3_host="${3}"
s3_bucket="${4}"
data="${5}"
root_path=$(dirname $(realpath "${BASH_SOURCE[0]}"))
administrations="ppparis
pref01
pref02
pref03
pref04
pref05
pref06
pref07
pref08
pref09
pref10
pref11
pref12
pref13
pref14
pref15
pref16
pref17
pref18
pref19
pref2a
pref2b
pref21
pref22
pref23
pref24
pref25
pref26
pref27
pref28
pref29
pref30
pref31
pref32
pref33
pref34
pref35
pref36
pref37
pref38
pref39
pref40
pref41
pref42
pref43
pref44
pref45
pref46
pref47
pref49
pref49
pref50
pref51
pref52
pref53
pref54
pref55
pref56
pref57
pref58
pref59
pref60
pref61
pref62
pref63
pref64
pref65
pref66
pref67
pref68
pref69
pref70
pref71
pref72
pref73
pref74
pref75
pref76
pref77
pref78
pref79
pref80
pref81
pref82
pref83
pref84
pref85
pref86
pref87
pref88
pref89
pref90
pref91
pref92
pref93
pref94
pref95
pref971
pref972
pref973
pref974
pref976
prefidf
prefpaca"
if test -z "$s3_key" || test -z "$s3_secret" || test -z "$s3_host" || test -z "$s3_bucket" || test -z "$data"; then
echo "Usage: ${0} <s3_key> <s3_secret> <s3_host> <s3_bucket> <data>"
exit 1
fi
for i in $administrations; do
${root_path}/upload-to-s3.sh "${i}" "$s3_key" "$s3_secret" "$s3_host" "$s3_bucket" "$data" || true
done
#!/usr/bin/env bash
set -e
pref="${1}"
s3_key="${2}"
s3_secret="${3}"
s3_host="${4}"
s3_bucket="${5}"
data="${6}"
if test -z "$pref" || test -z "$s3_key" || test -z "$s3_secret" || test -z "$s3_host" || test -z "$s3_bucket" || test -z "$data"; then
echo "Usage: ${0} <pref> <s3_key> <s3_secret> <s3_host> <s3_bucket> <data>"
exit 1
fi
data=$(realpath "${data}")
cd "${data}/${pref}/raa"
cd "../../"
file="${pref}.zip"
find . -name "${file}" -type f -delete
zip "${file}" "${pref}/raa/"*.txt "${pref}/raa/"*.json > /dev/null
echo "Uploading ${file}..."
ressource="/${s3_bucket}/${file}"
content_type=$(file --mime-type "${file}")
date=$(date --utc -R)
signature=$(echo -en "PUT\n\n${content_type}\n${date}\n${ressource}" | openssl sha1 -hmac "${s3_secret}" -binary | base64)
curl -X PUT \
-T "${file}" \
-H "Date: ${date}" \
-H "Content-Type: ${content_type}" \
-H "Authorization: AWS ${s3_key}:${signature}" \
"${s3_host}${ressource}"
rm "${file}"
#!/usr/bin/env python3
import os, sys, time, re
import subprocess
from bs4 import BeautifulSoup
import argparse
from urllib.parse import unquote
import logging
from selenium import webdriver
from pyvirtualdisplay import Display
from pdfminer.high_level import extract_text
# Config
__RAA_PAGE = 'https://www.prefecturedepolice.interieur.gouv.fr/actualites-et-presse/arretes/accueil-arretes'
__USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
__headless_mode = True
__WAITING_TIME = int(os.getenv('WAITING_TIME') or 5)
__LIST = os.getenv('LIST') or 'vidéoprotection,caméras,captation,aéronef'
__DATA_DIR = os.path.dirname(os.path.abspath(__file__))+'/data/ppparis/'
# Fonctions
def print_output(data):
print(data)
data = data.replace('\033[92m', '')
data = data.replace('\033[0m', '')
data = data.replace('\033[1m', '')
f = open(os.path.dirname(os.path.abspath(__file__))+'/output.log','a')
f.write(data+"\n")
f.close()
def get_html(url):
browser.get(url)
time.sleep(int(__WAITING_TIME*10))
page_content = browser.page_source
return page_content
def download_file(url):
browser.get(url)
time.sleep(__WAITING_TIME)
def parse_pdf(filename, name, date):
if not os.path.isfile(__DATA_DIR+filename):
logging.warning(f'ATTENTION: le fichier {filename} n\'existe pas')
else:
text = extract_text(__DATA_DIR+filename)
found = False
for keyword in __LIST.split(','):
if re.search(keyword, text, re.IGNORECASE|re.MULTILINE):
if not found:
print_output(f'\033[92m{name}\033[0m ({date})')
found = True
print_output(f' Le terme \033[1m{keyword}\033[0m a été trouvé.')
# Remplace le PDF par un fichier vide, afin de savoir la prochaine fois qu'il a déjà été analysé
f = open(__DATA_DIR+filename,'w')
f.write('')
f.close()
if found:
print_output('')
# Début du script
parser = argparse.ArgumentParser(prog='ppparis.py', description='Télécharge les RAA de la Préfecture de police de Paris et recherche des mots-clés')
parser.add_argument('-n', '--noheadless', action='store_true', help='ne lance pas le navigateur en mode headless (pratique pour débugguer ou en dehors d\'une CI)')
parser.add_argument('-w', '--waiting-time', type=int, action='store', help='délai (en secondes) d\'attente de chargement d\'une page, la durée du premier chargement étant cette valeur multipliée par 10 (par défaut : 5)')
parser.add_argument('-l', '--list', action='store', help='liste des termes recherchés, séparés par une virgule (par défaut : vidéoprotection,caméras,captation,aéronef)')
parser.add_argument('-v', action='store_true', help='relève le niveau de verbosité à INFO')
parser.add_argument('-vv', action='store_true', help='relève le niveau de verbosité à DEBUG')
args = parser.parse_args()
if args.v or os.getenv('VERBOSE'):
logging.basicConfig(level=logging.INFO)
if args.vv or os.getenv('VVERBOSE'):
logging.basicConfig(level=logging.DEBUG)
if args.noheadless:
__headless_mode = False
if not __headless_mode:
logging.debug('Mode noheadless')
if args.waiting_time:
__WAITING_TIME = args.waiting_time
logging.debug(f'WAITING_TIME: {__WAITING_TIME}')
if args.list:
__LIST = args.list
logging.info(f'Termes recherchés: {__LIST}')
# On crée le dossier de téléchargement
os.makedirs(__DATA_DIR, exist_ok=True)
# On démarre le navigateur
webdriver_options = webdriver.ChromeOptions()
webdriver_options.add_argument("--no-sandbox")
webdriver_options.add_argument("--disable-extensions")
webdriver_options.add_argument("--disable-gpu")
webdriver_options.add_argument("--disable-dev-shm-usage")
webdriver_options.add_argument("--use_subprocess")
webdriver_options.add_argument("--disable-blink-features=AutomationControlled")
webdriver_options.add_argument(f"--user-agent={__USER_AGENT}")
prefs = {
"download.default_directory":__DATA_DIR,
"download.prompt_for_download":False,
"plugins.always_open_pdf_externally":True
}
webdriver_options.add_experimental_option("prefs",prefs)
if __headless_mode:
webdriver_options.add_argument("--headless")
webdriver_options.add_argument("--window-size=1024,768")
display = Display(visible=False, size=(1024, 768))
display.start()
else:
webdriver_options.add_argument("--start-maximized")
browser = webdriver.Chrome(options=webdriver_options)
# Téléchargement des RAA
page_content = get_html(__RAA_PAGE)
# On charge le parser
soup = BeautifulSoup(page_content, 'html.parser')
# Pour chaque balise a, on regarde si c'est un PDF, et si oui on le parse
for a in soup.find_all('a', href=True):
if a['href'].endswith('.pdf'):
if a['href'].startswith('/'):
url = 'https://www.prefecturedepolice.interieur.gouv.fr'+a['href']
else:
url = a['href']
name = a.find('span').get_text()
date = a.find('div', class_="field--type-datetime").get_text()
filename = unquote(url.split('/')[-1])
# Si le fichier n'a pas été téléchargé, on le télécharge et on le parse
if not os.path.isfile(__DATA_DIR+filename):
logging.info(f'Nouveau fichier : {name} ({date}). URL : {url}')
try:
download_file(url)
except Exception:
logging.warning('ATTENTION: Impossible de télécharger le fichier '+url)
cmd = ['ocrmypdf', '-l', 'eng+fra', '--output-type', 'pdfa', '--redo-ocr', __DATA_DIR+filename, __DATA_DIR+filename]
logging.debug(f'Lancement de ocrmypdf: {cmd}')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as exc:
if not exc.returncode == 6:
logging.warning('ATTENTION : Impossible d\'OCRiser le document', exc.returncode, exc.output)
parse_pdf(filename, name, date)
# On arrête le navigateur
browser.quit()
if __headless_mode:
display.stop()