diff options
author | Marko Kreen | 2007-04-06 08:57:37 +0000 |
---|---|---|
committer | Marko Kreen | 2007-04-06 08:57:37 +0000 |
commit | 2c901a4b34b4fd7fc591e9fef9ce5820370b026d (patch) | |
tree | e1fe62c734730a77859f65a16ccdb0f0c7d89b55 /python | |
parent | 3dcd44fb908b51d2925e7a5cbc16b35e000b5b6d (diff) |
bit more complete take on non-ticking sync
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/syncer.py | 95 |
1 files changed, 57 insertions, 38 deletions
diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py index 0c9c2321..71a05fde 100644 --- a/python/londiste/syncer.py +++ b/python/londiste/syncer.py @@ -22,18 +22,15 @@ class Syncer(skytools.DBScript): p.add_option("--force", action="store_true", help="ignore lag") return p - def check_consumer(self, src_db): - src_curs = src_db.cursor() - + def check_consumer(self, setup_curs): # before locking anything check if consumer is working ok q = "select extract(epoch from ticker_lag) from pgq.get_queue_info(%s)" - src_curs.execute(q, [self.pgq_queue_name]) - ticker_lag = src_curs.fetchone()[0] + setup_curs.execute(q, [self.pgq_queue_name]) + ticker_lag = setup_curs.fetchone()[0] q = "select extract(epoch from lag)"\ " from pgq.get_consumer_info(%s, %s)" - src_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id]) - res = src_curs.fetchall() - src_db.commit() + setup_curs.execute(q, [self.pgq_queue_name, self.pgq_consumer_id]) + res = setup_curs.fetchall() if len(res) == 0: self.log.error('No such consumer') @@ -44,8 +41,7 @@ class Syncer(skytools.DBScript): self.log.error('Consumer lagging too much, cannot proceed') sys.exit(1) - def get_subscriber_table_state(self): - dst_db = self.get_database('subscriber_db') + def get_subscriber_table_state(self, dst_db): dst_curs = dst_db.cursor() q = "select * from londiste.subscriber_get_table_list(%s)" dst_curs.execute(q, [self.pgq_queue_name]) @@ -56,12 +52,18 @@ class Syncer(skytools.DBScript): def work(self): src_loc = self.cf.get('provider_db') lock_db = self.get_database('provider_db', cache='lock_db') - src_db = self.get_database('provider_db') - dst_db = self.get_database('subscriber_db') + setup_db = self.get_database('provider_db', cache='setup_db', autocommit = 1) + + src_db = self.get_database('provider_db', + isolation_level = skytools.I_SERIALIZABLE) + dst_db = self.get_database('subscriber_db', + isolation_level = skytools.I_SERIALIZABLE) - self.check_consumer(src_db) + setup_curs = setup_db.cursor() - state_list = self.get_subscriber_table_state() + self.check_consumer(setup_curs) + + state_list = self.get_subscriber_table_state(dst_db) state_map = {} full_list = [] for ts in state_list: @@ -82,12 +84,42 @@ class Syncer(skytools.DBScript): if st['merge_state'] != 'ok': self.log.info('Table %s not synced yet, no point' % tbl) continue - self.check_table(tbl, lock_db, src_db, dst_db) + self.check_table(tbl, lock_db, src_db, dst_db, setup_curs) lock_db.commit() src_db.commit() dst_db.commit() - def check_table(self, tbl, lock_db, src_db, dst_db): + def force_tick(self, setup_curs): + q = "select nextval(queue_tick_seq) as tick_pos,"\ + " setval(queue_event_seq, nextval(queue_event_seq) + 10000)"\ + " from pgq.queue where queue_name = %s and not queue_external_ticker" + setup_curs.execute(q, [self.pgq_queue_name]) + res = setup_curs.fetchall() + if not res: + raise Exception("Queue has external ticker, cannot proceed") + next_pos = res[0][0] + + start = time.time() + while 1: + time.sleep(0.5) + q = "select tick_id,"\ + " setval(queue_event_seq,"\ + " nextval(queue_event_seq) + 10000)"\ + " from pgq.tick, pgq.queue"\ + " where tick_queue = queue_id"\ + " and queue_name = %s"\ + " and tick_id > %s"\ + " order by tick_id asc limit 1" + setup_curs.execute(q, [self.pgq_queue_name, next_pos]) + res = setup_curs.fetchall() + if res: + return res[0][0] + + dur = time.time() - start + if dur > 10 and not self.options.force: + raise Exception("Ticker seems dead") + + def check_table(self, tbl, lock_db, src_db, dst_db, setup_curs): """Get transaction to same state, then process.""" @@ -112,33 +144,21 @@ class Syncer(skytools.DBScript): self.log.info('Syncing %s' % tbl) # consumer must get futher than this tick - # also, bump event_id_seq to force ticker to tick - q = "select nextval(queue_tick_seq) as tick_pos,"\ - " setval(queue_event_seq, nextval(queue_event_seq) + 2000)"\ - " from pgq.queue where queue_name = %s" - src_curs.execute(q, [self.pgq_queue_name]) - tick_id = src_curs.fetchone()[0] - src_db.commit() + tick_id = self.force_tick(setup_curs) # try to force second tick also - time.sleep(0.5) - q = "select setval(queue_event_seq, nextval(queue_event_seq) + 2000)"\ - " from pgq.queue where queue_name = %s" - src_curs.execute(q, [self.pgq_queue_name]) - src_db.commit() + self.force_tick(setup_curs) # take server time - src_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')") - tpos = src_curs.fetchone()[0] - src_db.commit() + setup_curs.execute("select to_char(now(), 'YYYY-MM-DD HH24:MI:SS.MS')") + tpos = setup_curs.fetchone()[0] # now wait while 1: - time.sleep(0.2) + time.sleep(0.5) - q = "select now() - lag > %s, now(), lag"\ + q = "select now() - lag > timestamp %s, now(), lag"\ " from pgq.get_consumer_info(%s, %s)" - src_curs.execute(q, [tpos, self.pgq_queue_name, self.pgq_consumer_id]) - res = src_curs.fetchall() - src_db.commit() + setup_curs.execute(q, [tpos, self.pgq_queue_name, self.pgq_consumer_id]) + res = setup_curs.fetchall() if len(res) == 0: raise Exception('No such consumer') @@ -155,12 +175,11 @@ class Syncer(skytools.DBScript): sys.exit(1) # take snapshot on provider side - src_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") + src_db.commit() src_curs.execute("SELECT 1") # take snapshot on subscriber side dst_db.commit() - dst_curs.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") dst_curs.execute("SELECT 1") # release lock |