Avoid loading two messages at the same time. In particular this can
cause issues if it's two copies of the same message on different lists,
which can cause a UNIQUE violation in the loader. It could also be a
problem if two messages on a new thread arrives in parallel, which could
cause two separate threads to be created.
This could be made more efficient by properly ordering the operations on
storage and using ON CONFLICT, but it's a very rare occassion and it
doesn't matter that we have to wait for a second or two for a previous
storage to complete.
connstr = 'need_connstr'
conn = psycopg2.connect(connstr)
+ curs = conn.cursor()
+
+ # Take an advisory lock to force serialization.
+ # We could do this "properly" by reordering operations and using ON CONFLICT,
+ # but concurrency is not that important and this is easier...
+ try:
+ curs.execute("SET statement_timeout='30s'")
+ curs.execute("SELECT pg_advisory_xact_lock(8059944559669076)")
+ except Exception, e:
+ print("Failed to wait on advisory lock: %s" % e)
+ sys.exit(1)
# Get the listid we're working on
- curs = conn.cursor()
curs.execute("SELECT listid FROM lists WHERE listname=%(list)s", {
'list': opt.list
})