Use advisory lock around load_message.py
authorMagnus Hagander <magnus@hagander.net>
Thu, 29 Nov 2018 16:01:22 +0000 (17:01 +0100)
committerMagnus Hagander <magnus@hagander.net>
Thu, 29 Nov 2018 16:01:22 +0000 (17:01 +0100)
Avoid loading two messages at the same time. In particular this can
cause issues if it's two copies of the same message on different lists,
which can cause a UNIQUE violation in the loader. It could also be a
problem if two messages on a new thread arrives in parallel, which could
cause two separate threads to be created.

This could be made more efficient by properly ordering the operations on
storage and using ON CONFLICT, but it's a very rare occassion and it
doesn't matter that we have to wait for a second or two for a previous
storage to complete.

loader/load_message.py

index 2c400b152553f3ab414d6086f47876954c4bb618..b4668603d520b4f05ea810b003223049f58754e7 100755 (executable)
@@ -85,9 +85,19 @@ if __name__ == "__main__":
                connstr = 'need_connstr'
 
        conn = psycopg2.connect(connstr)
+       curs = conn.cursor()
+
+       # Take an advisory lock to force serialization.
+       # We could do this "properly" by reordering operations and using ON CONFLICT,
+       # but concurrency is not that important and this is easier...
+       try:
+               curs.execute("SET statement_timeout='30s'")
+               curs.execute("SELECT pg_advisory_xact_lock(8059944559669076)")
+       except Exception, e:
+               print("Failed to wait on advisory lock: %s" % e)
+               sys.exit(1)
 
        # Get the listid we're working on
-       curs = conn.cursor()
        curs.execute("SELECT listid FROM lists WHERE listname=%(list)s", {
                        'list': opt.list
                        })