summaryrefslogtreecommitdiff
path: root/python/pgq/cascade
diff options
context:
space:
mode:
authorMarko Kreen2012-07-05 11:11:29 +0000
committerMarko Kreen2012-07-05 11:18:10 +0000
commit551a0d0f2adb892a0fb9c4fca5257cbc534ec558 (patch)
treec305ca3e7d13cad32f6716fa68abd29d1ed4de4d /python/pgq/cascade
parentdd68687e4d92944ae45e8dbba0995d5e18aeaa9b (diff)
londiste: Wait commands for londiste
add-table --wait-sync Don't return until all pending tables are synced wait-sync Wait until all pending tables are synced. wait-root Wait until root's next tick has appeared locally. wait-provider Wait until provider's next tick has appeared locally.
Diffstat (limited to 'python/pgq/cascade')
-rw-r--r--python/pgq/cascade/admin.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index e695ae6c..f8f294c8 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -766,6 +766,63 @@ class CascadeAdmin(skytools.AdminScript):
if n.combined_queue:
print('Combined Queue: %s (node type: %s)' % (n.combined_queue, n.combined_type))
+ def cmd_wait_root(self):
+ """Wait for next tick from root."""
+
+ self.load_local_info()
+
+ if self.queue_info.local_node.type == 'root':
+ self.log.info("Current node is root, no need to wait")
+ return
+
+ self.log.info("Finding root node")
+ root_node = self.find_root_node()
+ self.log.info("Root is %s", root_node)
+
+ dst_db = self.get_database('db')
+ self.wait_for_node(dst_db, root_node)
+
+ def cmd_wait_provider(self):
+ """Wait for next tick from provider."""
+
+ self.load_local_info()
+
+ if self.queue_info.local_node.type == 'root':
+ self.log.info("Current node is root, no need to wait")
+ return
+
+ dst_db = self.get_database('db')
+ node = self.queue_info.local_node.provider_node
+ self.log.info("Provider is %s", node)
+ self.wait_for_node(dst_db, node)
+
+ def wait_for_node(self, dst_db, node_name):
+ """Core logic for waiting."""
+
+ self.log.info("Fetching last tick for %s", node_name)
+ node_info = self.load_node_info(node_name)
+ tick_id = node_info.last_tick
+
+ self.log.info("Waiting for tick > %d", tick_id)
+
+ q = "select * from pgq_node.get_node_info(%s)"
+ dst_curs = dst_db.cursor()
+
+ while 1:
+ dst_curs.execute(q, [self.queue_name])
+ row = dst_curs.fetchone()
+ dst_db.commit()
+
+ if row['ret_code'] >= 300:
+ self.log.warning("Problem: %s", row['ret_code'], row['ret_note'])
+ return
+
+ if row['worker_last_tick'] > tick_id:
+ self.log.info("Got tick %d, exiting", row['worker_last_tick'])
+ break
+
+ self.sleep(2)
+
#
# Shortcuts for operating on nodes.
#