summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Kreen2007-07-20 16:39:53 +0000
committerMarko Kreen2007-07-20 16:39:53 +0000
commitd6cb4b1d7ce9f419fb10247099c41d1d254764e0 (patch)
tree15786c5173c0d027977a6034c5d8faf6e37aba26
parent7d06bf4ae9bbdbd3a8f18af07fb6214949807f95 (diff)
pgq.SerialConsumer: move tick queries to separate functions to allow overriding them if needed
-rw-r--r--python/pgq/consumer.py43
1 files changed, 18 insertions, 25 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 4b9c221b..0cd55d1d 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -319,14 +319,10 @@ class SerialConsumer(Consumer):
prev_tick = self.cur_batch_info['prev_tick_id']
- q = "select %s.get_last_tick(%%s)" % self.dst_schema
- dst_curs.execute(q, [self.consumer_id])
- res = dst_curs.fetchone()
-
- if not res or not res[0]:
+ dst_tick = self.get_last_tick(dst_curs)
+ if not dst_tick:
# seems this consumer has not run yet against dst_db
return False
- dst_tick = res[0]
if prev_tick == dst_tick:
# on track
@@ -346,27 +342,18 @@ class SerialConsumer(Consumer):
in external database.
"""
tick_id = self.cur_batch_info['tick_id']
- q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
- dst_curs.execute(q, [ self.consumer_id, tick_id ])
+ self.set_last_tick(dst_curs, tick_id)
def attach(self):
new = Consumer.attach(self)
if new:
- self.clean_completed_tick()
+ self.dst_reset()
def detach(self):
"""If detaching, also clean completed tick table on dest."""
Consumer.detach(self)
- self.clean_completed_tick()
-
- def clean_completed_tick(self):
- self.log.info("removing completed tick from dst")
- dst_db = self.get_database(self.remote_db)
- dst_curs = dst_db.cursor()
- q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
- dst_curs.execute(q, [self.consumer_id])
- dst_db.commit()
+ self.dst_reset()
def process_remote_batch(self, db, batch_id, event_list, dst_db):
raise Exception('process_remote_batch not implemented')
@@ -378,11 +365,8 @@ class SerialConsumer(Consumer):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- q = "select %s.get_last_tick(%%s)" % self.dst_schema
- dst_curs.execute(q, [self.consumer_id])
- row = dst_curs.fetchone()
- if row:
- dst_tick = row[0]
+ dst_tick = self.get_last_tick(dst_curs)
+ if dst_tick:
q = "select pgq.register_consumer_at(%s, %s, %s)"
src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
else:
@@ -395,8 +379,17 @@ class SerialConsumer(Consumer):
self.log.info("Resetting queue tracking on dst side")
dst_db = self.get_database(self.remote_db)
dst_curs = dst_db.cursor()
- q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
- dst_curs.execute(q, [self.consumer_id])
+ self.set_last_tick(dst_curs, None)
dst_db.commit()
+ def get_last_tick(self, dst_curs):
+ q = "select %s.get_last_tick(%%s)" % self.dst_schema
+ dst_curs.execute(q, [self.consumer_id])
+ res = dst_curs.fetchone()
+ return res[0]
+
+ def set_last_tick(self, dst_curs, tick_id):
+ q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
+ dst_curs.execute(q, [ self.consumer_id, tick_id ])
+