diff options
author | Marko Kreen | 2011-02-01 12:55:29 +0000 |
---|---|---|
committer | Marko Kreen | 2011-02-01 12:55:29 +0000 |
commit | 48da596f01c299916f28a9743fbdff56ce3e1702 (patch) | |
tree | 598bf368a0bb5211d95cbf0639ae6479818d8cd6 /python | |
parent | 4accf58ed4936b54cd9a8908952858e2159befcb (diff) |
cascade tag-dead: spread info over all nodes
Diffstat (limited to 'python')
-rw-r--r-- | python/pgq/cascade/admin.py | 39 |
1 files changed, 29 insertions, 10 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 998ea3ab..e3fb2096 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -11,9 +11,9 @@ setadm.py INI pause NODE [CONS] """ -import sys, time, optparse, skytools, psycopg2 +import sys, time, optparse, skytools -from skytools import UsageError +from skytools import UsageError, DBError from pgq.cascade.nodeinfo import * __all__ = ['CascadeAdmin'] @@ -340,9 +340,9 @@ class CascadeAdmin(skytools.AdminScript): node.load_status(curs) self.load_extra_status(curs, node) self.queue_info.add_node(node) - except psycopg2.Error, d: + except DBError, d: msg = str(d).strip().split('\n', 1)[0] - print('Node mname failure: %s' % msg) + print('Node %s failure: %s' % (mname, msg)) node = NodeInfo(self.queue_name, None, node_name = mname) self.queue_info.add_node(node) self.close_database('look_db') @@ -645,14 +645,33 @@ class CascadeAdmin(skytools.AdminScript): res[n.name] = 1 return res.keys() - def cmd_tag_dead(self, node_name): - # todo: write to db - self.log.info("Tagging node '%s' as dead" % node_name) + def cmd_tag_dead(self, dead_node_name): self.load_local_info() - self.queue_info.tag_dead(node_name) + + # tag node dead in memory + self.log.info("Tagging node '%s' as dead" % dead_node_name) + self.queue_info.tag_dead(dead_node_name) + + # tag node dead in local node q = "select * from pgq_node.register_location(%s, %s, null, true)" - self.node_cmd(self.local_node, q, [self.queue_name, node_name]) - # fixme: root? + self.node_cmd(self.local_node, q, [self.queue_name, dead_node_name]) + + # tag node dead in other nodes + nodelist = self.queue_info.member_map.keys() + for node_name in nodelist: + if not self.node_alive(node_name): + continue + if node_name == dead_node_name: + continue + if node_name == self.local_node: + continue + try: + q = "select * from pgq_node.register_location(%s, %s, null, true)" + self.node_cmd(node_name, q, [self.queue_name, dead_node_name]) + except DBError, d: + msg = str(d).strip().split('\n', 1)[0] + print('Node %s failure: %s' % (node_name, msg)) + self.close_node_database(node_name) def cmd_pause(self): """Pause a node""" |