summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/consumer.py76
1 files changed, 67 insertions, 9 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index df9f577e..c4315ef3 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -36,6 +36,24 @@ class CascadedConsumer(Consumer):
self.target_db = db_name
self.provider_connstr = None
+ def init_optparse(self, parser = None):
+ p = Consumer.init_optparse(self, parser)
+ p.add_option("--provider", help = "provider location for --register")
+ p.add_option("--rewind", action = "store_true",
+ help = "change queue position according to destination")
+ p.add_option("--reset", action = "store_true",
+ help = "reset queue pos on destination side")
+ return p
+
+ def startup(self):
+ if self.options.rewind:
+ self.rewind()
+ sys.exit(0)
+ if self.options.reset:
+ self.dst_reset()
+ sys.exit(0)
+ return Consumer.startup(self)
+
def register_consumer(self, provider_loc = None):
"""Register consumer on source node first, then target node."""
@@ -72,29 +90,69 @@ class CascadedConsumer(Consumer):
q = "select * from pgq_node.register_consumer(%s, %s, %s, %s)"
self.exec_cmd(dst_db, q, [self.queue_name, self.consumer_name, pnode, last_tick])
- def unregister_consumer(self):
+ def get_consumer_state(self):
dst_db = self.get_database(self.target_db)
dst_curs = dst_db.cursor()
-
- # fetch provider loc
q = "select * from pgq_node.get_consumer_state(%s, %s)"
rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
state = rows[0]
+
+ def get_provider_db(self, state):
provider_loc = state['provider_location']
+ return self.get_database(PDB, connstr = provider_loc)
+
+ def unregister_consumer(self):
+ dst_db = self.get_database(self.target_db)
+ state = self.get_consumer_state()
+ src_db = self.get_provider_db(state)
# unregister on provider
- src_db = self.get_database(PDB, connstr = provider_loc)
- src_curs = src_db.cursor()
Consumer.unregister_consumer(self)
# unregister on subscriber
q = "select * from pgq_node.unregister_consumer(%s, %s)"
self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ])
- def init_optparse(self, parser = None):
- p = Consumer.init_optparse(self, parser)
- p.add_option("--provider", help = "provider location for --register")
- return p
+ def rewind(self):
+ self.log.info("Rewinding queue")
+ dst_db = self.get_database(self.target_db)
+ dst_curs = dst_db.cursor()
+
+ state = self.get_consumer_state()
+ src_db = self.get_provider_db(state)
+ src_curs = src_db.cursor()
+
+ dst_tick = state['completed_tick']
+ if dst_tick:
+ q = "select pgq.register_consumer_at(%s, %s, %s)"
+ src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick])
+ else:
+ self.log.warning('No tick found on dst side')
+
+ dst_db.commit()
+ src_db.commit()
+
+ def dst_reset(self):
+ self.log.info("Resetting queue tracking on dst side")
+
+ dst_db = self.get_database(self.target_db)
+ dst_curs = dst_db.cursor()
+
+ state = self.get_consumer_state()
+
+ src_db = self.get_provider_db(state)
+ src_curs = src_db.cursor()
+
+ # fetch last tick from source
+ q = "select pgq.get_consumer_info(%s, %s)"
+ src_curs.execute(q, [self.queue_name, self.consumer_name])
+ last_tick = src_curs.fetchone()['last_tick']
+
+ # set on destination
+ self.finish_remote_batch(src_db, dst_db, last_tick)
+
+ src_db.commit()
+ dst_db.commit()
def process_batch(self, src_db, batch_id, event_list):
self._batch_info = self.get_batch_info(batch_id)