diff options
| author | Marko Kreen | 2012-07-05 11:11:29 +0000 |
|---|---|---|
| committer | Marko Kreen | 2012-07-05 11:18:10 +0000 |
| commit | 551a0d0f2adb892a0fb9c4fca5257cbc534ec558 (patch) | |
| tree | c305ca3e7d13cad32f6716fa68abd29d1ed4de4d /python/pgq/cascade | |
| parent | dd68687e4d92944ae45e8dbba0995d5e18aeaa9b (diff) | |
londiste: Wait commands for londiste
add-table --wait-sync
Don't return until all pending tables are synced
wait-sync
Wait until all pending tables are synced.
wait-root
Wait until root's next tick has appeared locally.
wait-provider
Wait until provider's next tick has appeared locally.
Diffstat (limited to 'python/pgq/cascade')
| -rw-r--r-- | python/pgq/cascade/admin.py | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index e695ae6c..f8f294c8 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -766,6 +766,63 @@ class CascadeAdmin(skytools.AdminScript): if n.combined_queue: print('Combined Queue: %s (node type: %s)' % (n.combined_queue, n.combined_type)) + def cmd_wait_root(self): + """Wait for next tick from root.""" + + self.load_local_info() + + if self.queue_info.local_node.type == 'root': + self.log.info("Current node is root, no need to wait") + return + + self.log.info("Finding root node") + root_node = self.find_root_node() + self.log.info("Root is %s", root_node) + + dst_db = self.get_database('db') + self.wait_for_node(dst_db, root_node) + + def cmd_wait_provider(self): + """Wait for next tick from provider.""" + + self.load_local_info() + + if self.queue_info.local_node.type == 'root': + self.log.info("Current node is root, no need to wait") + return + + dst_db = self.get_database('db') + node = self.queue_info.local_node.provider_node + self.log.info("Provider is %s", node) + self.wait_for_node(dst_db, node) + + def wait_for_node(self, dst_db, node_name): + """Core logic for waiting.""" + + self.log.info("Fetching last tick for %s", node_name) + node_info = self.load_node_info(node_name) + tick_id = node_info.last_tick + + self.log.info("Waiting for tick > %d", tick_id) + + q = "select * from pgq_node.get_node_info(%s)" + dst_curs = dst_db.cursor() + + while 1: + dst_curs.execute(q, [self.queue_name]) + row = dst_curs.fetchone() + dst_db.commit() + + if row['ret_code'] >= 300: + self.log.warning("Problem: %s", row['ret_code'], row['ret_note']) + return + + if row['worker_last_tick'] > tick_id: + self.log.info("Got tick %d, exiting", row['worker_last_tick']) + break + + self.sleep(2) + # # Shortcuts for operating on nodes. # |
