summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMarko Kreen2007-04-09 14:46:21 +0000
committerMarko Kreen2007-04-09 14:46:21 +0000
commit83f58098d577a8abc16067db10b862cc87fc63c7 (patch)
tree1b286014aaeb4a775910e218288e680c47d7fafa /python
parentc86ea29bb880dc6e40a96b49ba913e12588eacde (diff)
move tick forcing to db
Diffstat (limited to 'python')
-rw-r--r--python/londiste/syncer.py28
1 files changed, 9 insertions, 19 deletions
diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py
index 9e8e553e..4d089a2f 100644
--- a/python/londiste/syncer.py
+++ b/python/londiste/syncer.py
@@ -91,31 +91,21 @@ class Syncer(skytools.DBScript):
dst_db.commit()
def force_tick(self, setup_curs):
- q = "select nextval(queue_tick_seq) as tick_pos,"\
- " setval(queue_event_seq, nextval(queue_event_seq) + 10000)"\
- " from pgq.queue where queue_name = %s and not queue_external_ticker"
+ q = "select pgq.force_tick(%s)"
setup_curs.execute(q, [self.pgq_queue_name])
- res = setup_curs.fetchall()
- if not res:
- raise Exception("Queue has external ticker, cannot proceed")
- next_pos = res[0][0]
+ res = setup_curs.fetchone()
+ cur_pos = res[0]
start = time.time()
while 1:
time.sleep(0.5)
- q = "select tick_id,"\
- " setval(queue_event_seq,"\
- " nextval(queue_event_seq) + 10000)"\
- " from pgq.tick, pgq.queue"\
- " where tick_queue = queue_id"\
- " and queue_name = %s"\
- " and tick_id > %s"\
- " order by tick_id asc limit 1"
- setup_curs.execute(q, [self.pgq_queue_name, next_pos])
- res = setup_curs.fetchall()
- if res:
- return res[0][0]
+ setup_curs.execute(q, [self.pgq_queue_name])
+ res = setup_curs.fetchone()
+ if res[0] != cur_pos:
+ # new pos
+ return res[0]
+ # dont loop more than 10 secs
dur = time.time() - start
if dur > 10 and not self.options.force:
raise Exception("Ticker seems dead")