Commit 6582ec6d authored by Mindiell's avatar Mindiell

All specific functions have been moved into specific modules. Code is clearer now.

parent b4eac9cc
#encoding: utf-8
"""
Monitoring methods.
"""
import re
import sqlite3
import config
from logs import Log
from messages import messages
from utils import get_cursor, get_url, is_moderator
class Admin():
def admin(self, user, msg):
"""
Manage moderation.
A sub-command should be behind the !~admin command.
"""
Log.debug("admin command")
# Searching for a command after admin keyword
command = re.search("[~!]admin (list|add|del|timer)", msg)
if command:
command = command.group(1)
if command == "list":
return self.admin_list(user)
elif command == "add":
return self.admin_add(user, msg)
elif command == "del":
return self.admin_del(user, msg)
elif command == "timer":
return self.admin_timer(user)
def admin_list(self, user):
"""
List actual moderators.
"""
Log.debug("admin_list command")
if is_moderator(user):
connection = sqlite3.connect(config.sqlite_db)
names = []
for row in connection.execute("SELECT name FROM moderator"):
names.append(row[0].encode("utf-8"))
return messages["admin_list"] % ", ".join(sorted(names))
else:
return messages["not_moderator"]
def admin_add(self, user, msg):
"""
Add some new moderators if not existing yet.
"""
Log.debug("admin_add command")
if is_moderator(user):
try:
names = []
connection = sqlite3.connect(config.sqlite_db)
result = re.search("[!~]admin add (([^,]+, ?)+)?(.*)", msg)
if result.group(1):
names = [name.strip() for name in result.group(1).split(",") if name.strip() != ""]
names.append(result.group(3))
# Do not add actual moderators
moderators = []
for row in connection.execute("SELECT name FROM moderator"):
moderators.append(row[0].encode("utf-8"))
names = list(set([name for name in names if name not in moderators]))
if names:
# Converting set in list of tuples
values = [(name,) for name in names]
connection.executemany("INSERT INTO moderator (name) VALUES (?)", values)
connection.commit()
return messages["admin_add"] % ", ".join(names)
else:
return messages["admin_add_empty"]
except Exception:
return ""
else:
return messages["not_moderator"]
def admin_del(self, user, msg):
"""
Delete a moderator from list.
"""
Log.debug("admin_del command")
if is_moderator(user):
try:
names = []
result = re.search("[!~]admin del (([^,]+, ?)+)?(.*)", msg)
if result.group(1):
names = [name.strip() for name in result.group(1).split(",") if name.strip() != ""]
names.append(result.group(3))
names = list(set(names))
Log.debug(names)
connection = sqlite3.connect(config.sqlite_db)
for name in names:
connection.execute("DELETE FROM moderator WHERE name=?", (name, ))
connection.commit()
return messages["admin_del"] % ", ".join(names)
except Exception:
return ""
else:
return messages["not_moderator"]
def admin_timer(self, user):
"""
Relaunch a timer.
"""
Log.debug("admin_timer command")
if is_moderator(user):
return "REACTOR"
else:
return messages["not_moderator"]
#encoding: utf-8
"""
Fun commands.
"""
from random import choice
def fun(user, channel, message):
"""
This function is there to add some fun commands which could be not used easily.
"""
# Specific answer to Deltree, the animal's joke
if user.lower()=="deltree" and msg=="\_o< ~ Coin ~ >o_/":
animal = choice([
{
"left": """><((('>""",
"right": """<')))><""",
"sound": "blub",
},
{
"left": """=^..^=""",
"right": """=^..^=""",
"sound": "meow",
},
{
"left": """ˁ˚ᴥ˚ˀ""",
"right": """ˁ˚ᴥ˚ˀ""",
"sound": "wouf",
},
{
"left": """\_o<""",
"right": """>o_/""",
"sound": "coin",
},
{
"left": """^(*(oo)*)^""",
"right": """^(*(oo)*)^""",
"sound": "grouïk",
},
{
"left": """~~(__^·>""",
"right": """<·^__)~~""",
"sound": "yiik",
},
])
return messages["coin_deltree"] % (
animal["left"],
animal["sound"],
animal["sound"],
animal["right"],
)
return ""
#encoding: utf-8
"""
Utilities classes.
"""
import time
import config
class Log(object):
"""
Simple utility class to log easily.
"""
@classmethod
def log(cls, message):
"""
Logging message with timestamp.
"""
actual_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
with open(config.LOG_FILE, 'a') as file_handle:
try:
file_handle.write("%s: %s\n" % (actual_time, message.encode("utf-8")))
except UnicodeDecodeError:
file_handle.write("%s: %s\n" % (actual_time, message))
@classmethod
def debug(cls, message):
"""
Manage DEBUG level of logging.
"""
if config.LOG_LEVEL >= config.DEBUG:
cls.log("%s: %s" % ("DEBUG", message))
@classmethod
def warning(cls, message):
"""
Manage WARNING level of logging.
"""
if config.LOG_LEVEL >= config.WARNING:
cls.log("%s: %s" % ("WARNING", message))
@classmethod
def info(cls, message):
"""
Manage INFO level of logging.
"""
if config.LOG_LEVEL >= config.INFO:
cls.log("%s: %s" % ("INFO", message))
@classmethod
def error(cls, message):
"""
Manage ERROR level of logging.
"""
if config.LOG_LEVEL >= config.ERROR:
cls.log("%s: %s" % ("ERROR", message))
......@@ -9,7 +9,7 @@ messages = {
"""Bonjour, je suis le bot de la Quadrature du Net, vous pouvez me demander de l'aide si besoin. (wantzel help)""",
"help":
"""Mes commandes sont : ~help ~rp(cpam) ~status ~kill ~stats et ~admin.
"""Mes commandes sont : ~help ~rp(cpa) ~status ~kill ~stats et ~admin.
Pour plus d'informations, voir ici: https://wiki.laquadrature.net/Wantzel
Pour obtenir de l'aide sur une commande en particulier, il suffit de taper ~help <commande>""",
......@@ -19,7 +19,7 @@ messages = {
"help_rp":
"""Cette commande sert à ajouter un article à la Revue de Presse (https://wiki.laquadrature.net/Revue_de_presse)
L'utilisation se fait sous la forme: ~rp <url de l'article à ajouter>""",
L'utilisation se fait sous la forme: ~rp(cpa) <url de l'article à ajouter>""",
"help_status":
"""Cette commande sert à retrouver les informations concernant un article ajouté à la Revue de Presse (https://wiki.laquadrature.net/Revue_de_presse)
......
#encoding: utf-8
"""
Monitoring methods.
"""
import feedparser
import sqlite3
import time
import config
from logs import Log
from messages import messages
class Wiki():
def __init__(self, name, url):
# wiki's name
self.name = name
# base url
self.url = url
# default last_entry_published
self.last_entry_updated = time.strptime("2000-01-01", "%Y-%m-%d")
# See if there is a later last_entry_published for wiki
connection = sqlite3.connect(config.sqlite_db)
for row in connection.execute(
"SELECT last_entry_updated FROM wikis WHERE name=?",
(self.name,)
):
self.last_entry_updated = time.strptime(
row[0].encode("utf-8"),
"%Y-%m-%d %H:%M:%S %Z"
)
Log.debug("Dernière mise à jour du wiki: %s" % self.last_entry_updated)
def set_last_entry_updated(self, last_entry_updated):
self.last_entry_updated = last_entry_updated
last_entry_updated = time.strftime(
"%Y-%m-%d %H:%M:%S %Z",
self.last_entry_updated
)
connection = sqlite3.connect(config.sqlite_db)
connection.execute(
"UPDATE wikis SET last_entry_updated=? WHERE name=?",
(last_entry_updated, self.name)
)
connection.commit()
class Monitor():
def __init__(self):
# List of wikis to monitor
self.wikis = []
for wiki in config.wikis["mediawiki"]:
self.wikis.append(Wiki(wiki["name"], wiki["url"]))
def wiki_updates(self):
"""
This method loops over each wiki to monitor.
"""
messages = []
for wiki in self.wikis:
url = wiki.url + "api.php?days=1&limit=50&translations=filter&action=feedrecentchanges&feedformat=atom"
now = time.localtime()
today = time.strptime("%s-%s-%s %s" % (
now.tm_year,
now.tm_mon,
now.tm_mday,
time.tzname[0]
), "%Y-%m-%d %Z")
entries = feedparser.parse(url)['entries']
for entry in entries:
# if date of update is greater than today midnight
if today < entry.updated_parsed:
if wiki.last_entry_updated < entry.updated_parsed:
# Ecriture de la mise à jour sur le canal de travail
messages.append(messages["wiki_update"] % (
entry.author.encode("utf-8"),
entry.title.encode("utf-8"),
entry.link.encode("utf-8"),
))
# Save last_entry_published
wiki.set_last_entry_updated(entry.updated_parsed)
return messages
#encoding: utf-8
"""
Channel operator methods.
"""
import re
import config
from logs import Log
from messages import messages
class Op():
def __init__(self):
# Sequence for op_mode verification (fibonacci)
self.op_sequence = [1, 2, 3, 5, 8, 13, 21, 34, 55]
self.op_offset = 0
self.op_counter = 0
def need_op_mode(self, me, params):
"""
Send a message on channel RP_CHANNEL to beg an op mode to each actual operators.
params is an array with :
- params[0]:
- params[1]:
- params[2]: the channel
- params[3]: the list of all users on the channel
"""
Log.debug("Names : %s" % params)
message = ""
if params[2]==config.RP_CHANNEL:
ops = [user[1:] for user in params[3].split() if user[0]=="@"]
if me not in params[3]:
# Testing based on fibonacci sequence
self.op_counter += 1
Log.debug("op_counter : %s" % self.op_counter)
Log.debug("op_offset : %s" % self.op_offset)
Log.debug("op_sequence : %s" % self.op_sequence[self.op_offset])
if self.op_counter>self.op_sequence[self.op_offset]:
message = messages["please_op"] % ", ".join(ops)
# Then reset op_counter
self.op_counter = 0
# And move the sequence further in order not to spam channel
if self.op_offset<len(self.op_sequence)-1:
self.op_offset += 1
return message
def get_op_mode(self, user, flag_set):
# Cleaning user name
user = re.search("([^!]*)!", user).group(1)
if flag_set:
# reset counter and sequence
self.op_counter = 0
self.op_offset = 0
# thanks to user
return messages["oped"] % user
else:
# bad user ;o(
return messages["deoped"] % user
This diff is collapsed.
# encoding: utf-8
"""
Utility functions for Wantzel bot.
"""
import MySQLdb
import re
import sqlite3
import config
from logs import Log
def get_cursor():
"""
This function connects to a MySQL database and returns a usable cursor.
"""
connection = MySQLdb.connect(
host=config.dbserver,
user=config.dbuser,
passwd=config.dbpassword,
db=config.dbname
)
if connection:
return connection.cursor()
return None
def get_url(message):
"""
Retrieve the url in the message.
"""
# Let's get the url
result = re.search("(https?[^ ]+)", message)
if not result:
return
url = result.group(1)
# Removing anchor if needed
result = re.search("^([^#]*)", url)
if result:
url = result.group(1)
# Removing trackers
url = re.sub("[?&](utm_medium|utm_source|utm_campaign|xtor)=[^&]*", "", url)
return url
def is_moderator(name):
"""
This function verify if a user is a moderator.
"""
connection = sqlite3.connect(config.sqlite_db)
cursor = connection.cursor()
cursor.execute("SELECT count(*) FROM moderator WHERE name=?", (name, ))
if int(cursor.fetchone()[0]) == 1:
return True
return False
def tweet(message):
"""
Tweet message on specified account
"""
Log.debug("tweet method")
auth = OAuth(
config.TOKEN,
config.TOKENSEC,
config.CONSKEY,
config.CONSSEC
)
twitter = Twitter(auth=auth)
try:
Log.debug("Tweeting: %s" % message)
twitter.statuses.update(status=message)
except Exception as e:
Log.error("Erreur lors du tweet : " + str(e))
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment