summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/londiste/syncer.py95
1 files changed, 57 insertions, 38 deletions
diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py
index 0c9c2321..71a05fde 100644
--- a/python/londiste/syncer.py
+++ b/python/londiste/syncer.py
@@ -22,18 +22,15 @@ class Syncer(skytools.DBScript):
p.add_option("--force", action="store_true", help="ignore lag")
return p
- def check_consumer(self, src_db):
- src_curs = src_db.cursor()
-
+ def check_consumer(self, setup_curs):
# before locking anything check if consumer is working ok
q = "select extract(epoch from ticker_lag) from pgq.get_queue_info(%s)"
- src_curs.execute(q, [self.pgq_queue_name])
- ticker_lag = src_curs.fetchone()[0]
+ setup_curs.execute(q, [self.pgq_queue_name])
+ ticker_lag = setup_curs.fetchone()[0]
q = "select extract(epoch from lag)"\
" from pgq.get_consumer_info(%s, %s)"
- src_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id])
- res = src_curs.fetchall()
- src_db.commit()
+ setup_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id])
+ res = setup_curs.fetchall()
if len(res) == 0:
self.log.error('No such consumer')
@@ -44,8 +41,7 @@ class Syncer(skytools.DBScript):
self.log.error('Consumer lagging too much, cannot proceed')
sys.exit(1)
- def get_subscriber_table_state(self):
- dst_db = self.get_database('subscriber_db')
+ def get_subscriber_table_state(self, dst_db):
dst_curs = dst_db.cursor()
q = "select * from londiste.subscriber_get_table_list(%s)"
dst_curs.execute(q, [self.pgq_queue_name])
@@ -56,12 +52,18 @@ class Syncer(skytools.DBScript):
def work(self):
src_loc = self.cf.get('provider_db')
lock_db = self.get_database('provider_db', cache='lock_db')
- src_db = self.get_database('provider_db')
- dst_db = self.get_database('subscriber_db')
+ setup_db = self.get_database('provider_db', cache='setup_db', autocommit = 1)
+
+ src_db = self.get_database('provider_db',
+ isolation_level = skytools.I_SERIALIZABLE)
+ dst_db = self.get_database('subscriber_db',
+ isolation_level = skytools.I_SERIALIZABLE)
- self.check_consumer(src_db)
+ setup_curs = setup_db.cursor()
- state_list = self.get_subscriber_table_state()
+ self.check_consumer(setup_curs)
+
+ state_list = self.get_subscriber_table_state(dst_db)
state_map = {}
full_list = []
for ts in state_list:
@@ -82,12 +84,42 @@ class Syncer(skytools.DBScript):
if st['merge_state'] != 'ok':
self.log.info('Table %s not synced yet, no point' % tbl)
continue
- self.check_table(tbl, lock_db, src_db, dst_db)
+ self.check_table(tbl, lock_db, src_db, dst_db, setup_curs)
lock_db.commit()
src_db.commit()
dst_db.commit()
- def check_table(self, tbl, lock_db, src_db, dst_db):
+ def force_tick(self, setup_curs):
+ q = "select nextval(queue_tick_seq) as tick_pos,"\
+ " setval(queue_event_seq, nextval(queue_event_seq) + 10000)"\
+ " from pgq.queue where queue_name = %s and not queue_external_ticker"
+ setup_curs.execute(q, [self.pgq_queue_name])
+ res = setup_curs.fetchall()
+ if not res:
+ raise Exception("Queue has external ticker, cannot proceed")
+ next_pos = res[0][0]
+
+ start = time.time()
+ while 1:
+ time.sleep(0.5)
+ q = "select tick_id,"\
+ " setval(queue_event_seq,"\
+ " nextval(queue_event_seq) + 10000)"\
+ " from pgq.tick, pgq.queue"\
+ " where tick_queue = queue_id"\
+ " and queue_name = %s"\
+ " and tick_id > %s"\
+ " order by tick_id asc limit 1"
+ setup_curs.execute(q, [self.pgq_queue_name, next_pos])
+ res = setup_curs.fetchall()
+ if res:
+ return res[0][0]
+
+ dur = time.time() - start
+ if dur > 10 and not self.options.force:
+ raise Exception("Ticker seems dead")
+
+ def check_table(self, tbl, lock_db, src_db, dst_db, setup_curs):
"""Get transaction to same state, then process."""
@@ -112,33 +144,21 @@ class Syncer(skytools.DBScript):
self.log.info('Syncing %s' % tbl)
# consumer must get futher than this tick
- # also, bump event_id_seq to force ticker to tick
- q = "select nextval(queue_tick_seq) as tick_pos,"\
- " setval(queue_event_seq, nextval(queue_event_seq) + 2000)"\
- " from pgq.queue where queue_name = %s"
- src_curs.execute(q, [self.pgq_queue_name])
- tick_id = src_curs.fetchone()[0]
- src_db.commit()
+ tick_id = self.force_tick(setup_curs)
# try to force second tick also
- time.sleep(0.5)
- q = "select setval(queue_event_seq, nextval(queue_event_seq) + 2000)"\
- " from pgq.queue where queue_name = %s"
- src_curs.execute(q, [self.pgq_queue_name])
- src_db.commit()
+ self.force_tick(setup_curs)
# take server time
- src_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')")
- tpos = src_curs.fetchone()[0]
- src_db.commit()
+ setup_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')")
+ tpos = setup_curs.fetchone()[0]
# now wait
while 1:
- time.sleep(0.2)
+ time.sleep(0.5)
- q = "select now() - lag > %s, now(), lag"\
+ q = "select now() - lag > timestamp %s, now(), lag"\
" from pgq.get_consumer_info(%s, %s)"
- src_curs.execute(q, [tpos, self.pgq_queue_name, self.pgq_consumer_id])
- res = src_curs.fetchall()
- src_db.commit()
+ setup_curs.execute(q, [tpos, self.pgq_queue_name, self.pgq_consumer_id])
+ res = setup_curs.fetchall()
if len(res) == 0:
raise Exception('No such consumer')
@@ -155,12 +175,11 @@ class Syncer(skytools.DBScript):
sys.exit(1)
# take snapshot on provider side
- src_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
+ src_db.commit()
src_curs.execute("SELECT 1")
# take snapshot on subscriber side
dst_db.commit()
- dst_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
dst_curs.execute("SELECT 1")
# release lock