diff options
author | Marko Kreen | 2007-04-09 14:46:21 +0000 |
---|---|---|
committer | Marko Kreen | 2007-04-09 14:46:21 +0000 |
commit | 83f58098d577a8abc16067db10b862cc87fc63c7 (patch) | |
tree | 1b286014aaeb4a775910e218288e680c47d7fafa /python | |
parent | c86ea29bb880dc6e40a96b49ba913e12588eacde (diff) |
move tick forcing to db
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/syncer.py | 28 |
1 files changed, 9 insertions, 19 deletions
diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py index 9e8e553e..4d089a2f 100644 --- a/python/londiste/syncer.py +++ b/python/londiste/syncer.py @@ -91,31 +91,21 @@ class Syncer(skytools.DBScript): dst_db.commit() def force_tick(self, setup_curs): - q = "select nextval(queue_tick_seq) as tick_pos,"\ - " setval(queue_event_seq, nextval(queue_event_seq) + 10000)"\ - " from pgq.queue where queue_name = %s and not queue_external_ticker" + q = "select pgq.force_tick(%s)" setup_curs.execute(q, [self.pgq_queue_name]) - res = setup_curs.fetchall() - if not res: - raise Exception("Queue has external ticker, cannot proceed") - next_pos = res[0][0] + res = setup_curs.fetchone() + cur_pos = res[0] start = time.time() while 1: time.sleep(0.5) - q = "select tick_id,"\ - " setval(queue_event_seq,"\ - " nextval(queue_event_seq) + 10000)"\ - " from pgq.tick, pgq.queue"\ - " where tick_queue = queue_id"\ - " and queue_name = %s"\ - " and tick_id > %s"\ - " order by tick_id asc limit 1" - setup_curs.execute(q, [self.pgq_queue_name, next_pos]) - res = setup_curs.fetchall() - if res: - return res[0][0] + setup_curs.execute(q, [self.pgq_queue_name]) + res = setup_curs.fetchone() + if res[0] != cur_pos: + # new pos + return res[0] + # dont loop more than 10 secs dur = time.time() - start if dur > 10 and not self.options.force: raise Exception("Ticker seems dead") |