diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pgq/cascade/consumer.py | 76 |
1 files changed, 67 insertions, 9 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index df9f577e..c4315ef3 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -36,6 +36,24 @@ class CascadedConsumer(Consumer): self.target_db = db_name self.provider_connstr = None + def init_optparse(self, parser = None): + p = Consumer.init_optparse(self, parser) + p.add_option("--provider", help = "provider location for --register") + p.add_option("--rewind", action = "store_true", + help = "change queue position according to destination") + p.add_option("--reset", action = "store_true", + help = "reset queue pos on destination side") + return p + + def startup(self): + if self.options.rewind: + self.rewind() + sys.exit(0) + if self.options.reset: + self.dst_reset() + sys.exit(0) + return Consumer.startup(self) + def register_consumer(self, provider_loc = None): """Register consumer on source node first, then target node.""" @@ -72,29 +90,69 @@ class CascadedConsumer(Consumer): q = "select * from pgq_node.register_consumer(%s, %s, %s, %s)" self.exec_cmd(dst_db, q, [self.queue_name, self.consumer_name, pnode, last_tick]) - def unregister_consumer(self): + def get_consumer_state(self): dst_db = self.get_database(self.target_db) dst_curs = dst_db.cursor() - - # fetch provider loc q = "select * from pgq_node.get_consumer_state(%s, %s)" rows = self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ]) state = rows[0] + + def get_provider_db(self, state): provider_loc = state['provider_location'] + return self.get_database(PDB, connstr = provider_loc) + + def unregister_consumer(self): + dst_db = self.get_database(self.target_db) + state = self.get_consumer_state() + src_db = self.get_provider_db(state) # unregister on provider - src_db = self.get_database(PDB, connstr = provider_loc) - src_curs = src_db.cursor() Consumer.unregister_consumer(self) # unregister on subscriber q = "select * from pgq_node.unregister_consumer(%s, %s)" self.exec_cmd(dst_db, q, [ self.queue_name, self.consumer_name ]) - def init_optparse(self, parser = None): - p = Consumer.init_optparse(self, parser) - p.add_option("--provider", help = "provider location for --register") - return p + def rewind(self): + self.log.info("Rewinding queue") + dst_db = self.get_database(self.target_db) + dst_curs = dst_db.cursor() + + state = self.get_consumer_state() + src_db = self.get_provider_db(state) + src_curs = src_db.cursor() + + dst_tick = state['completed_tick'] + if dst_tick: + q = "select pgq.register_consumer_at(%s, %s, %s)" + src_curs.execute(q, [self.queue_name, self.consumer_name, dst_tick]) + else: + self.log.warning('No tick found on dst side') + + dst_db.commit() + src_db.commit() + + def dst_reset(self): + self.log.info("Resetting queue tracking on dst side") + + dst_db = self.get_database(self.target_db) + dst_curs = dst_db.cursor() + + state = self.get_consumer_state() + + src_db = self.get_provider_db(state) + src_curs = src_db.cursor() + + # fetch last tick from source + q = "select pgq.get_consumer_info(%s, %s)" + src_curs.execute(q, [self.queue_name, self.consumer_name]) + last_tick = src_curs.fetchone()['last_tick'] + + # set on destination + self.finish_remote_batch(src_db, dst_db, last_tick) + + src_db.commit() + dst_db.commit() def process_batch(self, src_db, batch_id, event_list): self._batch_info = self.get_batch_info(batch_id) |