Implement check functionality for reparsing messages
authorMagnus Hagander <magnus@hagander.net>
Mon, 15 Feb 2016 12:27:47 +0000 (13:27 +0100)
committerMagnus Hagander <magnus@hagander.net>
Mon, 15 Feb 2016 12:27:47 +0000 (13:27 +0100)
By default, reparse_message.py will now reparse the message and print the
diff, instead of actually updating it. Give it a --update to make it
actually do the update, like before. This makes it a good tool for testing
updates to the message parsing code.

Also implement --all and --sample <n>, to have it run the diff across
all or a certain number of messages in the archives. (Be *very* careful
about running it with both --all and --update..)

loader/lib/storage.py
loader/reparse_message.py

index 82649894e11282d45792283bcfe6850927421a73..e7eeb9513b91c9e0bc78192161decd653cbaa5ad 100644 (file)
@@ -1,3 +1,5 @@
+import difflib
+
 from parser import ArchivesParser
 
 from lib.log import log, opstatus
@@ -230,3 +232,55 @@ class ArchivesParserStorage(ArchivesParser):
                                                         [{'id': id, 'priority': i, 'msgid': self.parents[i]} for i in range(0, len(self.parents))])
 
                opstatus.stored += 1
+
+       def diff(self, conn, f, fromonlyf):
+               curs = conn.cursor()
+
+               # Fetch the old one so we have something to diff against
+               curs.execute("SELECT id, _from, _to, cc, subject, date, has_attachment, bodytxt FROM messages WHERE messageid=%(msgid)s", {
+                       'msgid': self.msgid,
+                       })
+               try:
+                       id, _from, _to, cc, subject, date, has_attachment, bodytxt = curs.fetchone()
+               except TypeError, e:
+                       f.write("---- %s ----\n" % self.msgid)
+                       f.write("Could not re-find in archives: %s\n" % e)
+                       f.write("\n-------------------------------\n\n")
+                       return
+
+
+               if bodytxt.decode('utf8') != self.bodytxt:
+                       log.status("Message %s has changes " % self.msgid)
+                       tempdiff = list(difflib.unified_diff(bodytxt.decode('utf8').splitlines(),
+                                                                                                self.bodytxt.splitlines(),
+                                                                                                fromfile='old',
+                                                                                                tofile='new',
+                                                                                                n=0,
+                                                                                                lineterm=''))
+                       if (len(tempdiff)-2) % 3 == 0:
+                               # 3 rows to a diff, two header rows.
+                               # Then verify that each slice of 3 contains one @@ row (header), one -From and one +>From,
+                               # which indicates the only change is in the From.
+                               ok = True
+                               for a,b,c in map(None, *([iter(tempdiff[2:])] * 3)):
+                                       if not (a.startswith('@@ ') and b.startswith('-From ') and c.startswith('+>From ')):
+                                               ok=False
+                                               break
+                               if ok:
+                                       fromonlyf.write("%s\n" % self.msgid)
+                                       return
+
+
+                       # Generate a nicer diff
+                       d = list(difflib.unified_diff(bodytxt.decode('utf8').splitlines(),
+                                                                                                  self.bodytxt.splitlines(),
+                                                                                                  fromfile='old',
+                                                                                                  tofile='new',
+                                                                                                  n=0,
+                                                                                                  lineterm=''))
+                       if len(d) > 0:
+                               f.write("---- %s ----\n" % self.msgid)
+                               f.write("\n".join(d))
+                               f.write("\n\n")
+               else:
+                       log.status("Message %s unchanged." % self.msgid)
index acd944cc14c4c4ea8355f1603b7e25f7bc832422..b00eea3aa1770181e253a6d1474928b9cac1eaa4 100755 (executable)
@@ -7,10 +7,12 @@
 
 import os
 import sys
+import codecs
 
 from optparse import OptionParser
 from ConfigParser import ConfigParser
 from StringIO import StringIO
+from datetime import datetime, timedelta
 
 import psycopg2
 
@@ -19,11 +21,25 @@ from lib.exception import IgnorableException
 from lib.log import log, opstatus
 from lib.varnish import VarnishPurger
 
+def ResultIter(cursor):
+       # Fetch lots of data but keep memory usage down a bit, by feeding it out of
+       # a generator, and use fetchmany()
+       while True:
+               results = cursor.fetchmany(5000)
+               if not results:
+                       break
+               for r in results:
+                       yield r
+
+
 if __name__ == "__main__":
        optparser = OptionParser()
        optparser.add_option('-m', '--msgid', dest='msgid', help='Messageid to load')
+       optparser.add_option('--all', dest='all', action='store_true', help='Load *all* messages currently in the db')
+       optparser.add_option('--sample', dest='sample', help='Load a sample of <n> messages')
        optparser.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Verbose output')
        optparser.add_option('--force-date', dest='force_date', help='Override date (used for dates that can\'t be parsed)')
+       optparser.add_option('--update', dest='update', action='store_true', help='Actually update, not just diff (default is diff)')
 
        (opt, args) = optparser.parse_args()
 
@@ -32,9 +48,12 @@ if __name__ == "__main__":
                optparser.print_usage()
                sys.exit(1)
 
-       if not opt.msgid:
-               print "Messageid must be specified"
-               optparser.print_usage()
+       if sum([1 for x in [opt.all, opt.sample, opt.msgid] if x]) != 1:
+               print "Must specify exactly one of --msgid, --all and --sample"
+               sys.exit(1)
+
+       if not opt.update and os.path.exists('reparse.diffs'):
+               print "File reparse.diffs already exists. Remove or rename and try again."
                sys.exit(1)
 
        log.set(opt.verbose)
@@ -48,28 +67,51 @@ if __name__ == "__main__":
 
        conn = psycopg2.connect(connstr)
 
-       # Load our message
-       curs = conn.cursor()
-       curs.execute("SELECT id, rawtxt FROM messages WHERE messageid=%(msgid)s", {
+       # Get messages
+       curs = conn.cursor('msglist')
+       if opt.all:
+               curs.execute("SELECT id, rawtxt FROM messages ORDER BY id")
+       elif opt.sample:
+               curs.execute("SELECT id, rawtxt FROM messages ORDER BY id DESC LIMIT %(num)s", {
+                       'num': int(opt.sample),
+               })
+       else:
+               curs.execute("SELECT id, rawtxt FROM messages WHERE messageid=%(msgid)s", {
                        'msgid': opt.msgid,
-                       })
-       r = curs.fetchall()
-       if len(r) == 0:
-               log.error("Message '%s' not found" % opt.msgid)
-               conn.close()
-               sys.exit(1)
-       if len(r) != 1:
-               log.error("!= 1 row existed (can't happen?) for message '%s'" % opt.msgid)
-               conn.close()
-               sys.exit(1)
-       (id, rawtxt) = r[0]
+               })
+
+       if not opt.update:
+               f = codecs.open("reparse.diffs", "w", "utf-8")
+               fromonlyf = open("reparse.fromonly","w")
+
+       firststatus = datetime.now()
+       laststatus = datetime.now()
+       num = 0
+       for id, rawtxt in ResultIter(curs):
+               num += 1
+               ap = ArchivesParserStorage()
+               ap.parse(StringIO(rawtxt))
+               ap.analyze(date_override=opt.force_date)
+               if opt.update:
+                       ap.store(conn, listid=-9, overwrite=True)
+               else:
+                       ap.diff(conn, f, fromonlyf)
+               if datetime.now() - laststatus > timedelta(seconds=5):
+                       sys.stdout.write("%s messages parsed (%s / second)\r" % (num, num / ((datetime.now()-firststatus).seconds)))
+                       sys.stdout.flush()
+                       laststatus = datetime.now()
 
-       ap = ArchivesParserStorage()
-       ap.parse(StringIO(rawtxt))
-       ap.analyze(date_override=opt.force_date)
-       ap.store(conn, listid=-9, overwrite=True)
+       print ""
 
-       conn.commit()
+       if opt.update:
+               conn.commit()
+               VarnishPurger(cfg).purge(ap.purges)
+               opstatus.print_status()
+       else:
+               fromonlyf.close()
+               f.close()
+               if os.path.getsize('reparse.diffs') == 0:
+                       os.unlink('reparse.diffs')
+               # Just in case
+               conn.rollback()
        conn.close()
-       opstatus.print_status()
-       VarnishPurger(cfg).purge(ap.purges)