#encoding: utf-8
"""
Bot Wantzel from La Quadrature du Net.
License : AGPLv3
Doc : https://wiki.laquadrature.net/Wantzel
TODO:
- Ajouter des commandes permettant de gérer le mediakit (moderators only)
- Parser les urls et voir ce qu'on peut faire:
- upload de vidéo
- tag de vidéo
- obtenir un lien vers une vidéo
"""
import feedparser
import HTMLParser
import importlib
from irc import IrcClientFactory
import MySQLdb
import re
import sqlite3
import time
from twisted.internet import reactor
from twitter import Twitter, OAuth
import urllib
import config
from messages import messages
LOG_FILE = "wantzel.log"
DEBUG = 3
WARNING = 2
INFO = 1
ERROR = 0
LOG_LEVEL = DEBUG
class Utils(object):
@classmethod
def log(cls, message):
with open(LOG_FILE, 'a') as f:
f.write("%s: %s\n" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), message))
@classmethod
def debug(cls, message):
if LOG_LEVEL>=DEBUG:
cls.log("%s: %s" % ("DEBUG", message))
@classmethod
def warning(cls, message):
if LOG_LEVEL>=WARNING:
cls.log("%s: %s" % ("WARNING", message))
@classmethod
def info(cls, message):
if LOG_LEVEL>=INFO:
cls.log("%s: %s" % ("INFO", message))
@classmethod
def error(cls, message):
if LOG_LEVEL>=ERROR:
cls.log("%s: %s" % ("ERROR", message))
def get_cursor():
"""
This function connects to a MySQL database and returns a usable cursor.
"""
connection = MySQLdb.connect(
host=config.dbserver,
user=config.dbuser,
passwd=config.dbpassword,
db=config.dbname
)
if connection:
return connection.cursor()
return None
def get_url(message):
"""
Retrieve the url in the message.
"""
# Let's get the url
result = re.search("(https?[^ ]+)", message)
if not result:
return "http"
url = result.group(1)
# Removing anchor if needed
result = re.search("^([^#]*)", url)
if result:
url = result.group(1)
# Removing trackers
url = re.sub("[?&](utm_medium|utm_source|utm_campaign|xtor)=[^&]*", "", url)
return url
def get_title(message):
title = ""
website = ""
exceptions = [
"twitter.com",
"github.com",
]
try:
url = get_url(message)
for exception in exceptions:
if exception in url:
return ("", "")
website = re.search("//([^/]*)", url).group(1)
f = urllib.URLopener().open(url)
content = f.read()
title = re.search("
([^<]+)", content).group(1)
except:
pass
# Unescaping HTML entities and removing multiple lines
title = HTMLParser.HTMLParser().unescape(re.sub("\n|\r", "", title))
return (title, website)
def is_moderator(name):
"""
This function verify if a user is a moderator.
"""
connection = sqlite3.connect(config.sqlite_db)
cursor = connection.cursor()
cursor.execute("SELECT count(*) FROM moderator WHERE name=?", (name, ))
if int(cursor.fetchone()[0])==1:
return True
return False
class Wantzel(object):
"""
Wantzel bot.
"""
def __init__(self):
"""
Initialization of bot over IRC.
"""
self.number = 0
# default last_entry_published
self.last_entry_published = time.strptime("2000-01-01", "%Y-%m-%d")
# See if there is something in the db
connection = sqlite3.connect(config.sqlite_db)
for row in connection.execute("SELECT last_entry_published FROM tweets"):
self.last_entry_published = time.strptime(
row[0].encode("utf-8"),
"%Y-%m-%d %H:%M:%S %Z"
)
self.irc = IrcClientFactory(config)
self.irc.set_privmsg = self.set_privmsg
reactor.connectTCP(config.server, config.port, self.irc)
# Prepare timer
reactor.callLater(config.timer, self.timer)
def timer(self):
"""
This method launches function regularly (see config.timer).
"""
Utils.debug("Timer called")
self.rp_to_twitter("http://www.laquadrature.net/fr/revue-de-presse/feed")
self.rp_to_twitter("http://www.laquadrature.net/en/press-review/feed")
self.count_articles()
# Recalling the timer
reactor.callLater(config.timer, self.timer)
def set_privmsg(self):
"""
This method set the methods to call for each callback received from IRC.
"""
self.irc.client.privmsg = self.on_privmsg
def send_message(self, channel, messages):
"""
Sends a message on specified channel, cutting each line in a new message
"""
for message in messages.splitlines():
self.irc.client.msg(channel, message)
def on_privmsg(self, user, channel, msg):
"""
Wantzel can understand a lot of commands. Commands followed by a (*)
are accessible only to moderators:
- help
Returns a message about how to use the bot.
If a command is passed after help, the message explains how to use
the command.
- rp(acp)
Add an article in the database
- stats
Show some statistics about the RP
- kill (*)
Kill an article by giving it a score of -100
- moderate list (*)
List rights in private
- moderate add (*)
Add a new moderator to list
- moderate remove (*)
Remove a moderator from list
"""
# Cleaning user name
user = re.search("([^!]*)!", user).group(1)
Utils.debug("Message received: %s %s %s" % (user, channel, msg))
# Whatever is done, get the title of an existing url in a message
#title = ""
#if "http" in msg:
# title, website = get_title(msg)
# Never answer to botself
if user!=config.nickname:
# If it's a query, bot should answer to the user as the channel
if "#" not in channel:
channel = user
# Help command, specific
if "wantzel" in msg and ("help" in msg or "aide" in msg):
self.help(user, channel, msg)
# Find known command
command = re.search("!(rp[acp]*|kill|help|stats|admin)", msg)
if command:
command = command.group(1)
Utils.debug("Command: %s" % command)
if command.startswith("rp"):
Utils.debug("Calling self.rp")
self.rp(command, user, channel, msg, title)
elif command=="help":
Utils.debug("Calling self.help")
self.help(user, channel, msg)
elif command=="kill":
Utils.debug("Calling self.kill")
self.kill(user, channel, msg)
elif command=="stats":
Utils.debug("Calling self.stats")
self.stats(user, channel, msg)
elif command=="admin":
Utils.debug("Calling self.admin")
self.admin(user, channel, msg)
# No more giving the title of an url
#if title and website:
# self.send_message(channel, messages["title"] % (title, website))
def help(self, user, channel, msg):
"""
Show global help.
If a known command is behind the !help command, an adequate message is
returned.
"""
Utils.debug("help command")
# Searching for a command after help keyword
command = re.search("!help (stats|rp|help|kill|admin)", msg)
if command:
command = command.group(1)
self.send_message(channel, messages["help_"+command])
else:
self.send_message(channel, messages["help"])
def rp(self, command, user, channel, msg, title=""):
"""
Adding the article in rp database.
"""
Utils.debug("rp command : %s" % command)
Utils.debug("rp user : %s" % user)
Utils.debug("rp channel : %s" % channel)
Utils.debug("rp msg : %s" % msg)
Utils.debug("rp title : %s" % title)
cite = 0
note = 0
url = get_url(msg)
Utils.debug("url: %s" % url)
if url=="":
return
elif url=="http":
self.send_message(channel, messages["rp_http"] % user)
return
# Looking for such an article in database
cursor = get_cursor()
cursor.execute("SELECT id, note, provenance FROM presse WHERE url = %s", (url, ))
rows = cursor.fetchall()
if not rows:
# LQdN is quoted
if "c" in command:
cite += 2
# the article speak about LQdN
if command.count("p")>1:
cite += 2
# Archive this article
if "a" in command:
note -= 2
Utils.debug("Adding an article by %s: %s" % (user, url))
result = cursor.execute(
"""INSERT INTO presse SET
url=%s, provenance=%s, cite=%s, note=%s, datec=NOW(), title=%s,
lang='', published=0, nid=0, screenshot=0, fetched=0, seemscite=0
""",
(url, user, cite, note, title)
)
self.send_message(channel, messages["rp_new_article"] % user)
else:
if rows[0][2]!=user:
Utils.debug("Adding a point by %s on %s" % (user, rows[0][0]))
result = cursor.execute(
"UPDATE presse SET note=note+1 WHERE id=%s",
(rows[0][0], )
)
if (rows[0][1]+1)<3:
self.send_message(channel, messages["rp_known_article"] % user)
else:
self.send_message(channel, messages["rp_taken_article"] % user)
# Update number of articles to do
self.count_articles()
def kill(self, user, channel, msg):
"""
Kill an article by setting its score to -100.
"""
Utils.debug("kill command")
if is_moderator(user):
url = get_url(msg)
Utils.debug("url: %s" % url)
if url=="":
return
elif url=="http":
self.send_message(channel, messages["rp_http"] % user)
return
# Looking for such an article in database
cursor = get_cursor()
cursor.execute("SELECT id, note FROM presse WHERE url=%s", (url, ))
rows = cursor.fetchall()
if not rows:
self.send_message(channel, messages["kill_none"] % url)
else:
cursor.execute("UPDATE presse SET note=-100 WHERE id=%s", (rows[0][0], ))
self.send_message(channel, messages["kill_done"] % url)
else:
self.send_message(channel, messages["not_moderator"])
def stats(self, user, channel, msg):
"""
Returns stats on articles in press review.
"""
Utils.debug("stats command")
cursor = get_cursor()
periods = [1, 3, 7, 15]
notes = [0, 3 ,4]
notnull = 0
somethingatall = 0
result = ""
for note in notes:
notnull = 0
result = result + "note>=%s: " % note
for period in periods:
cursor.execute(
"""SELECT COUNT(id) AS cid FROM presse
WHERE nid=0
AND datec>(NOW()-INTERVAL %s DAY)
AND note>=%s""",
(period, note)
)
rows = cursor.fetchall()
if rows[0][0]>0:
result = result + "%sj:%s, " % (period, rows[0][0])
notnull = 1
somethingatall = 1
if notnull:
result = result[:-2] + "\n"
if somethingatall==0:
result = messages["stats_bravo"] % periods[-1]
self.send_message(channel, result)
def admin(self, user, channel, msg):
"""
Manage moderation.
A sub-command should be behind the !admin command.
"""
Utils.debug("admin command")
# Searching for a command after admin keyword
command = re.search("!admin (list|add|del)", msg)
if command:
command = command.group(1)
if command=="list":
self.admin_list(user, channel, msg)
elif command=="add":
self.admin_add(user, channel, msg)
elif command=="del":
self.admin_del(user, channel, msg)
def admin_list(self, user, channel, msg):
"""
List actual moderators.
"""
Utils.debug("admin_list command")
if is_moderator(user):
connection = sqlite3.connect(config.sqlite_db)
names = []
for row in connection.execute("SELECT name FROM moderator"):
names.append(row[0].encode("utf-8"))
self.send_message(channel, messages["admin_list"] % ", ".join(names))
else:
self.send_message(channel, messages["not_moderator"])
def admin_add(self, user, channel, msg):
"""
Add some new moderators if not existing yet.
"""
Utils.debug("admin_add command")
if is_moderator(user):
try:
names = []
connection = sqlite3.connect(config.sqlite_db)
result = re.search("!admin add (([^,]+, ?)+)?(.*)", msg)
if result.group(1):
names = [name for name in result.group(1).split(", ") if name!=""]
names.append(result.group(3))
# Do not add actual moderators
moderators = []
for row in connection.execute("SELECT name FROM moderator"):
moderators.append(row[0].encode("utf-8"))
names = set([name for name in names if name not in moderators])
if names:
# Converting set in list of tuples
values = [(name,) for name in names]
connection.executemany("INSERT INTO moderator (name) VALUES (?)", values)
connection.commit()
self.send_message(channel, messages["admin_add"] % ", ".join(names))
else:
self.send_message(channel, messages["admin_add_empty"])
except:
pass
else:
self.send_message(channel, messages["not_moderator"])
def admin_del(self, user, channel, msg):
"""
Delete a moderator from list.
"""
Utils.debug("admin_del command")
if is_moderator(user):
try:
names = []
result = re.search("!admin del (([^,]+, ?)+)?(.*)", msg)
if result.group(1):
names = [name for name in result.group(1).split(", ") if name!=""]
names.append(result.group(3))
names = set(names)
Utils.debug(names)
connection = sqlite3.connect(config.sqlite_db)
for name in names:
connection.execute("DELETE FROM moderator WHERE name=?", (name, ))
connection.commit()
self.send_message(channel, messages["admin_del"] % ", ".join(names))
except:
pass
else:
self.send_message(channel, messages["not_moderator"])
def count_articles(self):
"""
Count number of articles not done in RP and updates the topic of the
press review channel if needed.
"""
Utils.debug("count_articles method")
cursor = get_cursor()
cursor.execute("""SELECT COUNT(*) FROM presse
WHERE DATE_SUB(NOW(), INTERVAL 2 MONTH) 2
AND nid = 0""")
rows = cursor.fetchall()
number = int(rows[0][0])
if self.number!=number:
self.irc.client.topic("#lqdn-rp", messages["topic"] % number)
pass
self.number = number
def rp_to_twitter(self, rss):
"""
By parsing the RSS feed of the press-review, we know what to tweet.
"""
Utils.debug("rp_to_twitter method")
now = time.localtime()
today = time.strptime("%s-%s-%s %s" % (
now.tm_year,
now.tm_mon,
now.tm_mday,
time.tzname[0]
),
"%Y-%m-%d %Z"
)
language = "fr"
if "/en/" in rss:
language = "en"
entries = feedparser.parse(rss)['entries']
entries.reverse()
for entry in entries:
# if date of publication is greater than today, midnight, and
# lesser than future
if today < entry.published_parsed < now:
if self.last_entry_published < entry.published_parsed:
self.tweet(messages["tweet_rp_%s" % language] % (
entry.title.encode("utf-8"),
entry.link.encode("utf-8")
))
Utils.debug(entry.published_parsed)
Utils.debug(entry.title)
# Save last_entry_published
self.last_entry_published = entry.published_parsed
last_entry_published = time.strftime(
"%Y-%m-%d %H:%M:%S %Z",
self.last_entry_published
)
connection = sqlite3.connect(config.sqlite_db)
connection.execute(
"UPDATE tweets SET last_entry_published=?",
(last_entry_published,)
)
connection.commit()
# Tweet only one message in order not to spam
return
def tweet(self, message):
"""
Tweet message on specified account
"""
Utils.debug("tweet method")
auth = OAuth(
config.TOKEN,
config.TOKENSEC,
config.CONSKEY,
config.CONSSEC
)
twitter = Twitter(auth=auth)
try:
Utils.debug("Tweeting: %s" % message)
twitter.statuses.update(status=message)
except:
pass
if __name__ == '__main__':
wantzel = Wantzel()
reactor.run()