#encoding: utf-8 """ Bot Wantzel from La Quadrature du Net. License : AGPLv3 Doc : https://wiki.laquadrature.net/Wantzel TODO: - Ajouter des commandes permettant de gérer le mediakit (moderators only) - Parser les urls et voir ce qu'on peut faire: - upload de vidéo - tag de vidéo - obtenir un lien vers une vidéo """ import feedparser import HTMLParser import importlib from irc import IrcClientFactory import MySQLdb import re import sqlite3 import time from twisted.internet import reactor from twitter import Twitter, OAuth import urllib import config from messages import messages LOG_FILE = "wantzel.log" DEBUG = 3 WARNING = 2 INFO = 1 ERROR = 0 LOG_LEVEL = DEBUG class Utils(object): @classmethod def log(cls, message): with open(LOG_FILE, 'a') as f: f.write("%s: %s\n" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), message)) @classmethod def debug(cls, message): if LOG_LEVEL>=DEBUG: cls.log("%s: %s" % ("DEBUG", message)) @classmethod def warning(cls, message): if LOG_LEVEL>=WARNING: cls.log("%s: %s" % ("WARNING", message)) @classmethod def info(cls, message): if LOG_LEVEL>=INFO: cls.log("%s: %s" % ("INFO", message)) @classmethod def error(cls, message): if LOG_LEVEL>=ERROR: cls.log("%s: %s" % ("ERROR", message)) def get_cursor(): """ This function connects to a MySQL database and returns a usable cursor. """ connection = MySQLdb.connect( host=config.dbserver, user=config.dbuser, passwd=config.dbpassword, db=config.dbname ) if connection: return connection.cursor() return None def get_url(message): """ Retrieve the url in the message. """ # Let's get the url result = re.search("(https?[^ ]+)", message) if not result: return "http" url = result.group(1) # Removing anchor if needed result = re.search("^([^#]*)", url) if result: url = result.group(1) # Removing trackers url = re.sub("[?&](utm_medium|utm_source|utm_campaign|xtor)=[^&]*", "", url) return url def get_title(message): title = "" website = "" exceptions = [ "twitter.com", "github.com", ] try: url = get_url(message) for exception in exceptions: if exception in url: return ("", "") website = re.search("//([^/]*)", url).group(1) f = urllib.URLopener().open(url) content = f.read() title = re.search("([^<]+)", content).group(1) except: return ("", "") # Unescaping HTML entities and removing multiple lines title = HTMLParser.HTMLParser().unescape(re.sub("\n|\r", "", title)) return (title, website) def is_moderator(name): """ This function verify if a user is a moderator. """ connection = sqlite3.connect(config.sqlite_db) cursor = connection.cursor() cursor.execute("SELECT count(*) FROM moderator WHERE name=?", (name, )) if int(cursor.fetchone()[0])==1: return True return False class Wantzel(object): """ Wantzel bot. """ def __init__(self): """ Initialization of bot over IRC. """ self.number = 0 # default last_entry_published self.last_entry_published = time.strptime("2000-01-01", "%Y-%m-%d") # See if there is something in the db connection = sqlite3.connect(config.sqlite_db) for row in connection.execute("SELECT last_entry_published FROM tweets"): self.last_entry_published = time.strptime( row[0].encode("utf-8"), "%Y-%m-%d %H:%M:%S %Z" ) self.irc = IrcClientFactory(config) self.irc.set_privmsg = self.set_privmsg reactor.connectTCP(config.server, config.port, self.irc) # Prepare timer reactor.callLater(config.timer, self.timer) def timer(self): """ This method launches function regularly (see config.timer). """ Utils.debug("Timer called") self.rp_to_twitter("http://www.laquadrature.net/fr/revue-de-presse/feed") self.rp_to_twitter("http://www.laquadrature.net/en/press-review/feed") self.count_articles() # Recalling the timer reactor.callLater(config.timer, self.timer) def set_privmsg(self): """ This method set the methods to call for each callback received from IRC. """ self.irc.client.privmsg = self.on_privmsg def send_message(self, channel, messages): """ Sends a message on specified channel, cutting each line in a new message """ for message in messages.splitlines(): self.irc.client.msg(channel, message) def on_privmsg(self, user, channel, msg): """ Wantzel can understand a lot of commands. Commands followed by a (*) are accessible only to moderators: - help Returns a message about how to use the bot. If a command is passed after help, the message explains how to use the command. - rp(acp) Add an article in the database - stats Show some statistics about the RP - kill (*) Kill an article by giving it a score of -100 - moderate list (*) List rights in private - moderate add (*) Add a new moderator to list - moderate remove (*) Remove a moderator from list """ # Cleaning user name user = re.search("([^!]*)!", user).group(1) Utils.debug("Message received: %s %s %s" % (user, channel, msg)) # Whatever is done, get the title of an existing url in a message title = "" if "http" in msg: title, website = get_title(msg) # Never answer to botself if user!=config.nickname: # If it's a query, bot should answer to the user as the channel if "#" not in channel: channel = user # Help command, specific if "wantzel" in msg and ("help" in msg or "aide" in msg): self.help(user, channel, msg) # Find known command command = re.search("!(rp[acp]*|kill|help|stats|admin)", msg) Utils.debug("Command: %s" % command) if command: Utils.debug("group(0): %s" % command.group(0)) command = command.group(1) Utils.debug("Command: %s" % command) if command.startswith("rp"): Utils.debug("Calling self.rp") self.rp(command, user, channel, msg, title) elif command=="help": Utils.debug("Calling self.help") self.help(user, channel, msg) elif command=="kill": Utils.debug("Calling self.kill") self.kill(user, channel, msg) elif command=="stats": Utils.debug("Calling self.stats") self.stats(user, channel, msg) elif command=="admin": Utils.debug("Calling self.admin") self.admin(user, channel, msg) # No more giving the title of an url #if title and website: # self.send_message(channel, messages["title"] % (title, website)) def help(self, user, channel, msg): """ Show global help. If a known command is behind the !help command, an adequate message is returned. """ Utils.debug("help command") # Searching for a command after help keyword command = re.search("!help (stats|rp|help|kill|admin)", msg) if command: command = command.group(1) self.send_message(channel, messages["help_"+command]) else: self.send_message(channel, messages["help"]) def rp(self, command, user, channel, msg, title=""): """ Adding the article in rp database. """ Utils.debug("rp command : %s" % command) Utils.debug("rp user : %s" % user) Utils.debug("rp channel : %s" % channel) Utils.debug("rp msg : %s" % msg) Utils.debug("rp title : %s" % title) cite = 0 note = 0 url = get_url(msg) Utils.debug("url: %s" % url) if url=="": return elif url=="http": self.send_message(channel, messages["rp_http"] % user) return # Looking for such an article in database cursor = get_cursor() cursor.execute("SELECT id, note, provenance FROM presse WHERE url = %s", (url, )) rows = cursor.fetchall() if not rows: # LQdN is quoted if "c" in command: cite += 2 # the article speak about LQdN if command.count("p")>1: cite += 2 # Archive this article if "a" in command: note -= 2 Utils.debug("Adding an article by %s: %s" % (user, url)) result = cursor.execute( """INSERT INTO presse SET url=%s, provenance=%s, cite=%s, note=%s, datec=NOW(), title=%s, lang='', published=0, nid=0, screenshot=0, fetched=0, seemscite=0 """, (url, user, cite, note, title) ) self.send_message(channel, messages["rp_new_article"] % user) else: if rows[0][2]!=user: Utils.debug("Adding a point by %s on %s" % (user, rows[0][0])) result = cursor.execute( "UPDATE presse SET note=note+1 WHERE id=%s", (rows[0][0], ) ) if (rows[0][1]+1)<3: self.send_message(channel, messages["rp_known_article"] % user) else: self.send_message(channel, messages["rp_taken_article"] % user) # Update number of articles to do self.count_articles() def kill(self, user, channel, msg): """ Kill an article by setting its score to -100. """ Utils.debug("kill command") if is_moderator(user): url = get_url(msg) Utils.debug("url: %s" % url) if url=="": return elif url=="http": self.send_message(channel, messages["rp_http"] % user) return # Looking for such an article in database cursor = get_cursor() cursor.execute("SELECT id, note FROM presse WHERE url=%s", (url, )) rows = cursor.fetchall() if not rows: self.send_message(channel, messages["kill_none"] % url) else: cursor.execute("UPDATE presse SET note=-100 WHERE id=%s", (rows[0][0], )) self.send_message(channel, messages["kill_done"] % url) else: self.send_message(channel, messages["not_moderator"]) def stats(self, user, channel, msg): """ Returns stats on articles in press review. """ Utils.debug("stats command") cursor = get_cursor() periods = [1, 3, 7, 15] notes = [0, 3 ,4] notnull = 0 somethingatall = 0 result = "" for note in notes: notnull = 0 result = result + "note>=%s: " % note for period in periods: cursor.execute( """SELECT COUNT(id) AS cid FROM presse WHERE nid=0 AND datec>(NOW()-INTERVAL %s DAY) AND note>=%s""", (period, note) ) rows = cursor.fetchall() if rows[0][0]>0: result = result + "%sj:%s, " % (period, rows[0][0]) notnull = 1 somethingatall = 1 if notnull: result = result[:-2] + "\n" if somethingatall==0: result = messages["stats_bravo"] % periods[-1] self.send_message(channel, result) def admin(self, user, channel, msg): """ Manage moderation. A sub-command should be behind the !admin command. """ Utils.debug("admin command") # Searching for a command after admin keyword command = re.search("!admin (list|add|del)", msg) if command: command = command.group(1) if command=="list": self.admin_list(user, channel, msg) elif command=="add": self.admin_add(user, channel, msg) elif command=="del": self.admin_del(user, channel, msg) def admin_list(self, user, channel, msg): """ List actual moderators. """ Utils.debug("admin_list command") if is_moderator(user): connection = sqlite3.connect(config.sqlite_db) names = [] for row in connection.execute("SELECT name FROM moderator"): names.append(row[0].encode("utf-8")) self.send_message(channel, messages["admin_list"] % ", ".join(names)) else: self.send_message(channel, messages["not_moderator"]) def admin_add(self, user, channel, msg): """ Add some new moderators if not existing yet. """ Utils.debug("admin_add command") if is_moderator(user): try: names = [] connection = sqlite3.connect(config.sqlite_db) result = re.search("!admin add (([^,]+, ?)+)?(.*)", msg) if result.group(1): names = [name for name in result.group(1).split(", ") if name!=""] names.append(result.group(3)) # Do not add actual moderators moderators = [] for row in connection.execute("SELECT name FROM moderator"): moderators.append(row[0].encode("utf-8")) names = set([name for name in names if name not in moderators]) if names: # Converting set in list of tuples values = [(name,) for name in names] connection.executemany("INSERT INTO moderator (name) VALUES (?)", values) connection.commit() self.send_message(channel, messages["admin_add"] % ", ".join(names)) else: self.send_message(channel, messages["admin_add_empty"]) except: pass else: self.send_message(channel, messages["not_moderator"]) def admin_del(self, user, channel, msg): """ Delete a moderator from list. """ Utils.debug("admin_del command") if is_moderator(user): try: names = [] result = re.search("!admin del (([^,]+, ?)+)?(.*)", msg) if result.group(1): names = [name for name in result.group(1).split(", ") if name!=""] names.append(result.group(3)) names = set(names) Utils.debug(names) connection = sqlite3.connect(config.sqlite_db) for name in names: connection.execute("DELETE FROM moderator WHERE name=?", (name, )) connection.commit() self.send_message(channel, messages["admin_del"] % ", ".join(names)) except: pass else: self.send_message(channel, messages["not_moderator"]) def count_articles(self): """ Count number of articles not done in RP and updates the topic of the press review channel if needed. """ Utils.debug("count_articles method") cursor = get_cursor() cursor.execute("""SELECT COUNT(*) FROM presse WHERE DATE_SUB(NOW(), INTERVAL 2 MONTH) 2 AND nid = 0""") rows = cursor.fetchall() number = int(rows[0][0]) if self.number!=number: self.irc.client.topic("#lqdn-rp", messages["topic"] % number) pass self.number = number def rp_to_twitter(self, rss): """ By parsing the RSS feed of the press-review, we know what to tweet. """ Utils.debug("rp_to_twitter method") now = time.localtime() today = time.strptime("%s-%s-%s %s" % ( now.tm_year, now.tm_mon, now.tm_mday, time.tzname[0] ), "%Y-%m-%d %Z" ) language = "fr" if "/en/" in rss: language = "en" entries = feedparser.parse(rss)['entries'] entries.reverse() for entry in entries: # if date of publication is greater than today, midnight, and # lesser than future if today < entry.published_parsed < now: if self.last_entry_published < entry.published_parsed: self.tweet(messages["tweet_rp_%s" % language] % ( entry.title.encode("utf-8"), entry.link.encode("utf-8") )) Utils.debug(entry.published_parsed) Utils.debug(entry.title) # Save last_entry_published self.last_entry_published = entry.published_parsed last_entry_published = time.strftime( "%Y-%m-%d %H:%M:%S %Z", self.last_entry_published ) connection = sqlite3.connect(config.sqlite_db) connection.execute( "UPDATE tweets SET last_entry_published=?", (last_entry_published,) ) connection.commit() # Tweet only one message in order not to spam return def tweet(self, message): """ Tweet message on specified account """ Utils.debug("tweet method") auth = OAuth( config.TOKEN, config.TOKENSEC, config.CONSKEY, config.CONSSEC ) twitter = Twitter(auth=auth) try: Utils.debug("Tweeting: %s" % message) twitter.statuses.update(status=message) except: pass if __name__ == '__main__': wantzel = Wantzel() reactor.run()