diff options
author | Marko Kreen | 2011-11-21 11:06:15 +0000 |
---|---|---|
committer | Marko Kreen | 2011-11-21 11:06:15 +0000 |
commit | 1ca5e2a5cde0057ed1ad58d497c4e93326b7fef8 (patch) | |
tree | 807235b3c87d7baf325a886c93a026b785e0bd37 | |
parent | eae1cff118baff8ce8890014c3eef2fff19f26a8 (diff) |
drop-node: better behaviour when node is down
-rw-r--r-- | python/pgq/cascade/admin.py | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 144eee5f..48f4e43a 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -473,27 +473,39 @@ class CascadeAdmin(skytools.AdminScript): self.load_local_info() - node = self.load_node_info(node_name) + try: + node = self.load_node_info(node_name) + if node: + # see if we can safely drop + subscriber_list = self.get_node_subscriber_list(node_name) + if subscriber_list: + raise UsageError('node still has subscribers') + except skytools.DBError: + pass - # see if we can safely drop - subscriber_list = self.get_node_subscriber_list(node_name) - if subscriber_list: - raise Exception('node still has subscribers') + try: + # drop node info + db = self.get_node_database(node_name) + q = "select * from pgq_node.drop_node(%s, %s)" + self.exec_cmd(db, q, [self.queue_name, node_name]) + + # unregister node location from root node (event will be added to queue) + root_db = self.find_root_db() + q = "select * from pgq_node.unregister_location(%s, %s)" + self.exec_cmd(root_db, q, [self.queue_name, node_name]) + except skytools.DBError, d: + self.log.warning("Removal failure: %s", str(d)) + + # brute force removal + for n in self.queue_info.member_map.values(): + try: + q = "select * from pgq_node.drop_node(%s, %s)" + self.node_cmd(n.name, q, [self.queue_name, node_name]) + except skytools.DBError, d: + self.log.warning("Failed to remove from '%s': %s", n.name, str(d)) - # remove the node from all the databases - # for n in self.queue_info.member_map.values(): - # q = "select * from pgq_node.drop_node(%s, %s)" - # self.node_cmd(n.name, q, [self.queue_name, node_name]) - # drop node info - db = self.get_node_database(node_name) - q = "select * from pgq_node.drop_node(%s, %s)" - self.exec_cmd(db, q, [self.queue_name, node_name]) - # unregister node location from root node (event will be added to queue) - root_db = self.find_root_db() - q = "select * from pgq_node.unregister_location(%s, %s)" - self.exec_cmd(root_db, q, [self.queue_name, node_name]) def node_depends(self, sub_node, top_node): cur_node = sub_node |