summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Kreen2011-11-21 11:06:15 +0000
committerMarko Kreen2011-11-21 11:06:15 +0000
commit1ca5e2a5cde0057ed1ad58d497c4e93326b7fef8 (patch)
tree807235b3c87d7baf325a886c93a026b785e0bd37
parenteae1cff118baff8ce8890014c3eef2fff19f26a8 (diff)
drop-node: better behaviour when node is down
-rw-r--r--python/pgq/cascade/admin.py46
1 files changed, 29 insertions, 17 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index 144eee5f..48f4e43a 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -473,27 +473,39 @@ class CascadeAdmin(skytools.AdminScript):
self.load_local_info()
- node = self.load_node_info(node_name)
+ try:
+ node = self.load_node_info(node_name)
+ if node:
+ # see if we can safely drop
+ subscriber_list = self.get_node_subscriber_list(node_name)
+ if subscriber_list:
+ raise UsageError('node still has subscribers')
+ except skytools.DBError:
+ pass
- # see if we can safely drop
- subscriber_list = self.get_node_subscriber_list(node_name)
- if subscriber_list:
- raise Exception('node still has subscribers')
+ try:
+ # drop node info
+ db = self.get_node_database(node_name)
+ q = "select * from pgq_node.drop_node(%s, %s)"
+ self.exec_cmd(db, q, [self.queue_name, node_name])
+
+ # unregister node location from root node (event will be added to queue)
+ root_db = self.find_root_db()
+ q = "select * from pgq_node.unregister_location(%s, %s)"
+ self.exec_cmd(root_db, q, [self.queue_name, node_name])
+ except skytools.DBError, d:
+ self.log.warning("Removal failure: %s", str(d))
+
+ # brute force removal
+ for n in self.queue_info.member_map.values():
+ try:
+ q = "select * from pgq_node.drop_node(%s, %s)"
+ self.node_cmd(n.name, q, [self.queue_name, node_name])
+ except skytools.DBError, d:
+ self.log.warning("Failed to remove from '%s': %s", n.name, str(d))
- # remove the node from all the databases
- # for n in self.queue_info.member_map.values():
- # q = "select * from pgq_node.drop_node(%s, %s)"
- # self.node_cmd(n.name, q, [self.queue_name, node_name])
- # drop node info
- db = self.get_node_database(node_name)
- q = "select * from pgq_node.drop_node(%s, %s)"
- self.exec_cmd(db, q, [self.queue_name, node_name])
- # unregister node location from root node (event will be added to queue)
- root_db = self.find_root_db()
- q = "select * from pgq_node.unregister_location(%s, %s)"
- self.exec_cmd(root_db, q, [self.queue_name, node_name])
def node_depends(self, sub_node, top_node):
cur_node = sub_node