# encoding: utf-8 """ Bot Wantzel from La Quadrature du Net. License : AGPLv3 Doc : https://wiki.laquadrature.net/Wantzel TODO: - Ajouter des commandes permettant de gérer le mediakit (moderators only) - Parser les urls et voir ce qu'on peut faire: - upload de vidéo - tag de vidéo - obtenir un lien vers une vidéo """ import re import sqlite3 import time import feedparser from irc import IrcClientFactory import MySQLdb from twisted.internet import reactor from twitter import Twitter, OAuth import config from messages import messages LOG_FILE = "wantzel.log" DEBUG = 3 WARNING = 2 INFO = 1 ERROR = 0 LOG_LEVEL = DEBUG class Utils(object): """ Simple utility class to log easily. """ @classmethod def log(cls, message): """ Logging message with timestamp. """ actual_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) with open(LOG_FILE, 'a') as file_handle: try: file_handle.write("%s: %s\n" % (actual_time, message.encode("utf-8"))) except Exception: file_handle.write("%s: Erreur de log\n" % actual_time) try: file_handle.write("%s: %s\n" % (actual_time, message)) except Exception: file_handle.write("%s: Seconde erreur de log\n" % actual_time) @classmethod def debug(cls, message): """ Manage DEBUG level of logging. """ if LOG_LEVEL >= DEBUG: cls.log("%s: %s" % ("DEBUG", message)) @classmethod def warning(cls, message): """ Manage WARNING level of logging. """ if LOG_LEVEL >= WARNING: cls.log("%s: %s" % ("WARNING", message)) @classmethod def info(cls, message): """ Manage INFO level of logging. """ if LOG_LEVEL >= INFO: cls.log("%s: %s" % ("INFO", message)) @classmethod def error(cls, message): """ Manage ERROR level of logging. """ if LOG_LEVEL >= ERROR: cls.log("%s: %s" % ("ERROR", message)) def get_cursor(): """ This function connects to a MySQL database and returns a usable cursor. """ connection = MySQLdb.connect( host=config.dbserver, user=config.dbuser, passwd=config.dbpassword, db=config.dbname ) if connection: return connection.cursor() return None def get_url(message): """ Retrieve the url in the message. """ # Let's get the url result = re.search("(https?[^ ]+)", message) if not result: return "http" url = result.group(1) # Removing anchor if needed result = re.search("^([^#]*)", url) if result: url = result.group(1) # Removing trackers url = re.sub("[?&](utm_medium|utm_source|utm_campaign|xtor)=[^&]*", "", url) return url def is_moderator(name): """ This function verify if a user is a moderator. """ connection = sqlite3.connect(config.sqlite_db) cursor = connection.cursor() cursor.execute("SELECT count(*) FROM moderator WHERE name=?", (name, )) if int(cursor.fetchone()[0]) == 1: return True return False def tweet(message): """ Tweet message on specified account """ Utils.debug("tweet method") auth = OAuth( config.TOKEN, config.TOKENSEC, config.CONSKEY, config.CONSSEC ) twitter = Twitter(auth=auth) try: Utils.debug("Tweeting: %s" % message) twitter.statuses.update(status=message) except Exception: pass class Wantzel(object): """ Wantzel bot. """ def __init__(self): """ Initialization of bot over IRC. """ self.number = None # default last_entry_published self.last_entry_published = time.strptime("2000-01-01", "%Y-%m-%d") # See if there is something in the db connection = sqlite3.connect(config.sqlite_db) for row in connection.execute("SELECT last_entry_published FROM tweets"): self.last_entry_published = time.strptime( row[0].encode("utf-8"), "%Y-%m-%d %H:%M:%S %Z" ) Utils.debug("Dernier tweet: %s" % self.last_entry_published) self.irc = IrcClientFactory(config) self.irc.set_privmsg = self.set_privmsg reactor.connectTCP(config.server, config.port, self.irc) # Prepare timer reactor.callLater(config.timer, self.timer) def timer(self): """ This method launches function regularly (see config.timer). """ Utils.debug("Timer called") self.rp_to_twitter("http://www.laquadrature.net/fr/revue-de-presse/feed") self.rp_to_twitter("http://www.laquadrature.net/en/press-review/feed") self.count_articles() # Recalling the timer reactor.callLater(config.timer, self.timer) def set_privmsg(self): """ This method set the methods to call for each callback received from IRC. """ self.irc.client.privmsg = self.on_privmsg def send_message(self, channel, multiline_message): """ Sends a message on specified channel, cutting each line in a new message """ for message in multiline_message.splitlines(): self.irc.client.msg(channel, message) def on_privmsg(self, user, channel, msg): """ Wantzel can understand a lot of commands. Commands followed by a (*) are accessible only to moderators: - help Returns a message about how to use the bot. If a command is passed after help, the message explains how to use the command. - rp(acp) Add an article in the database - stats Show some statistics about the RP - kill (*) Kill an article by giving it a score of -100 - admin list (*) List rights in private - admin add (*) Add one or more new moderator to list - admin del (*) Delete one or more moderator from list - admin timer Relaunch a timer """ # Cleaning user name user = re.search("([^!]*)!", user).group(1) Utils.debug("Message received: %s %s %s" % (user, channel, msg)) # Never answer to botself if user != config.nickname: # If it's a query, bot should answer to the user as the channel if "#" not in channel: channel = user # Help command, specific if "wantzel" in msg and ("help" in msg or "aide" in msg): self.help(channel, msg) # Find known command command = re.search("[!~](rp[acp]*|kill|help|stats|admin)", msg) Utils.debug("Command: %s" % command) if command: Utils.debug("group(0): %s" % command.group(0)) if command.group(0).startswith('!'): self.send_message(channel, messages["new_starter"]) command = command.group(1) Utils.debug("Command: %s" % command) if command.startswith("rp"): Utils.debug("Calling self.rp") self.rp(command, user, channel, msg) elif command == "help": Utils.debug("Calling self.help") self.help(channel, msg) elif command == "kill": Utils.debug("Calling self.kill") self.kill(user, channel, msg) elif command == "stats": Utils.debug("Calling self.stats") self.stats(channel) elif command == "admin": Utils.debug("Calling self.admin") self.admin(user, channel, msg) # No more giving the title of an url #if title and website: # self.send_message(channel, messages["title"] % (title, website)) def help(self, channel, msg): """ Show global help. If a known command is behind the ~help command, an adequate message is returned. """ Utils.debug("help command") # Searching for a command after help keyword command = re.search("~help (help|rp|stats|kill|admin)", msg) if command: command = command.group(1) self.send_message(channel, messages["help_"+command]) else: self.send_message(channel, messages["help"]) def rp(self, command, user, channel, msg): """ Adding the article in rp database. """ Utils.debug("rp command : %s" % command) Utils.debug("rp user : %s" % user) Utils.debug("rp channel : %s" % channel) Utils.debug("rp msg : %s" % msg) cite = 0 note = 1 url = get_url(msg) Utils.debug("url: %s" % url) if url == "": return elif url == "http": self.send_message(channel, messages["rp_http"] % user) return # Looking for such an article in database cursor = get_cursor() cursor.execute("SELECT id, note, provenance FROM presse WHERE url = %s", (url, )) rows = cursor.fetchall() if not rows: # LQdN is quoted if "c" in command: cite += 2 # the article speak about LQdN if command.count("p") > 1: cite += 2 # Archive this article if "a" in command: note = -2 Utils.debug("Adding an article by %s: %s" % (user, url)) cursor.execute( """INSERT INTO presse SET url=%s, provenance=%s, cite=%s, note=%s, datec=NOW(), title='', lang='', published=0, nid=0, screenshot=0, fetched=0, seemscite=0 """, (url, user, cite, note) ) self.send_message(channel, messages["rp_new_article"] % user) else: if rows[0][2] != user: Utils.debug("Adding a point by %s on %s" % (user, rows[0][0])) cursor.execute( "UPDATE presse SET note=note+1 WHERE id=%s", (rows[0][0], ) ) if (rows[0][1]+1) < 3: self.send_message(channel, messages["rp_known_article"] % user) else: self.send_message(channel, messages["rp_taken_article"] % user) # Update number of articles to do self.count_articles() def kill(self, user, channel, msg): """ Kill an article by setting its score to -100. """ Utils.debug("kill command") if is_moderator(user): url = get_url(msg) Utils.debug("url: %s" % url) if url == "": return elif url == "http": self.send_message(channel, messages["rp_http"] % user) return # Looking for such an article in database cursor = get_cursor() cursor.execute("SELECT id, note FROM presse WHERE url=%s", (url, )) rows = cursor.fetchall() if not rows: self.send_message(channel, messages["kill_none"] % url) else: cursor.execute("UPDATE presse SET note=-100 WHERE id=%s", (rows[0][0], )) self.send_message(channel, messages["kill_done"] % url) else: self.send_message(channel, messages["not_moderator"]) def stats(self, channel): """ Returns stats on articles in press review. """ Utils.debug("stats command") cursor = get_cursor() periods = [1, 3, 7, 15] notes = [0, 3, 4] notnull = 0 somethingatall = 0 result = "" for note in notes: notnull = 0 result = result + "note>=%s: " % note for period in periods: cursor.execute( """SELECT COUNT(id) AS cid FROM presse WHERE nid=0 AND datec>(NOW()-INTERVAL %s DAY) AND note>=%s""", (period, note) ) rows = cursor.fetchall() if rows[0][0] > 0: result = result + "%sj:%s, " % (period, rows[0][0]) notnull = 1 somethingatall = 1 if notnull: result = result[:-2] + "\n" if somethingatall == 0: result = messages["stats_bravo"] % periods[-1] self.send_message(channel, result) def admin(self, user, channel, msg): """ Manage moderation. A sub-command should be behind the ~admin command. """ Utils.debug("admin command") # Searching for a command after admin keyword command = re.search("~admin (list|add|del|timer)", msg) if command: command = command.group(1) if command == "list": self.admin_list(user, channel) elif command == "add": self.admin_add(user, channel, msg) elif command == "del": self.admin_del(user, channel, msg) elif command == "timer": self.admin_timer(user, channel) def admin_list(self, user, channel): """ List actual moderators. """ Utils.debug("admin_list command") if is_moderator(user): connection = sqlite3.connect(config.sqlite_db) names = [] for row in connection.execute("SELECT name FROM moderator"): names.append(row[0].encode("utf-8")) self.send_message(channel, messages["admin_list"] % ", ".join(names)) else: self.send_message(channel, messages["not_moderator"]) def admin_add(self, user, channel, msg): """ Add some new moderators if not existing yet. """ Utils.debug("admin_add command") if is_moderator(user): try: names = [] connection = sqlite3.connect(config.sqlite_db) result = re.search("~admin add (([^,]+, ?)+)?(.*)", msg) if result.group(1): names = [name for name in result.group(1).split(", ") if name != ""] names.append(result.group(3)) # Do not add actual moderators moderators = [] for row in connection.execute("SELECT name FROM moderator"): moderators.append(row[0].encode("utf-8")) names = list(set([name for name in names if name not in moderators])) if names: # Converting set in list of tuples values = [(name,) for name in names] connection.executemany("INSERT INTO moderator (name) VALUES (?)", values) connection.commit() self.send_message(channel, messages["admin_add"] % ", ".join(names)) else: self.send_message(channel, messages["admin_add_empty"]) except Exception: pass else: self.send_message(channel, messages["not_moderator"]) def admin_del(self, user, channel, msg): """ Delete a moderator from list. """ Utils.debug("admin_del command") if is_moderator(user): try: names = [] result = re.search("~admin del (([^,]+, ?)+)?(.*)", msg) if result.group(1): names = [name for name in result.group(1).split(", ") if name != ""] names.append(result.group(3)) names = list(set(names)) Utils.debug(names) connection = sqlite3.connect(config.sqlite_db) for name in names: connection.execute("DELETE FROM moderator WHERE name=?", (name, )) connection.commit() self.send_message(channel, messages["admin_del"] % ", ".join(names)) except Exception: pass else: self.send_message(channel, messages["not_moderator"]) def admin_timer(self, user, channel): """ Relaunch a timer. """ Utils.debug("admin_timer command") if is_moderator(user): try: # Recalling the timer reactor.callLater(config.timer, self.timer) except Exception: pass else: self.send_message(channel, messages["not_moderator"]) def count_articles(self): """ Count number of articles not done in RP and updates the topic of the press review channel if needed. """ Utils.debug("count_articles method") cursor = get_cursor() cursor.execute("""SELECT COUNT(*) FROM presse WHERE DATE_SUB(NOW(), INTERVAL 2 MONTH) 2 AND nid = 0""") rows = cursor.fetchall() number = int(rows[0][0]) Utils.debug("Found %s articles." % number) if self.number != number: self.irc.client.topic("#lqdn-rp", messages["topic"] % number) self.number = number def rp_to_twitter(self, rss): """ By parsing the RSS feed of the press-review, we know what to tweet. """ Utils.debug("rp_to_twitter method") now = time.localtime() today = time.strptime("%s-%s-%s %s" % ( now.tm_year, now.tm_mon, now.tm_mday, time.tzname[0] ), "%Y-%m-%d %Z") language = "fr" if "/en/" in rss: language = "en" entries = feedparser.parse(rss)['entries'] entries.reverse() Utils.debug(self.last_entry_published) for entry in entries: # if date of publication is greater than today, midnight, and # lesser than future if today < entry.published_parsed < now: if self.last_entry_published < entry.published_parsed: tweet(messages["tweet_rp_%s" % language] % ( entry.title.encode("utf-8"), entry.link.encode("utf-8") )) Utils.debug(entry.published_parsed) Utils.debug(entry.title) # Save last_entry_published self.last_entry_published = entry.published_parsed last_entry_published = time.strftime( "%Y-%m-%d %H:%M:%S %Z", self.last_entry_published ) connection = sqlite3.connect(config.sqlite_db) connection.execute( "UPDATE tweets SET last_entry_published=?", (last_entry_published,) ) connection.commit() # Tweet only one message in order not to spam return else: Utils.debug(entry.title) Utils.debug(entry.published_parsed) if __name__ == '__main__': Wantzel() reactor.run()