summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMarko Kreen2011-02-01 12:55:29 +0000
committerMarko Kreen2011-02-01 12:55:29 +0000
commit48da596f01c299916f28a9743fbdff56ce3e1702 (patch)
tree598bf368a0bb5211d95cbf0639ae6479818d8cd6 /python
parent4accf58ed4936b54cd9a8908952858e2159befcb (diff)
cascade tag-dead: spread info over all nodes
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/admin.py39
1 files changed, 29 insertions, 10 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index 998ea3ab..e3fb2096 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -11,9 +11,9 @@ setadm.py INI pause NODE [CONS]
"""
-import sys, time, optparse, skytools, psycopg2
+import sys, time, optparse, skytools
-from skytools import UsageError
+from skytools import UsageError, DBError
from pgq.cascade.nodeinfo import *
__all__ = ['CascadeAdmin']
@@ -340,9 +340,9 @@ class CascadeAdmin(skytools.AdminScript):
node.load_status(curs)
self.load_extra_status(curs, node)
self.queue_info.add_node(node)
- except psycopg2.Error, d:
+ except DBError, d:
msg = str(d).strip().split('\n', 1)[0]
- print('Node mname failure: %s' % msg)
+ print('Node %s failure: %s' % (mname, msg))
node = NodeInfo(self.queue_name, None, node_name = mname)
self.queue_info.add_node(node)
self.close_database('look_db')
@@ -645,14 +645,33 @@ class CascadeAdmin(skytools.AdminScript):
res[n.name] = 1
return res.keys()
- def cmd_tag_dead(self, node_name):
- # todo: write to db
- self.log.info("Tagging node '%s' as dead" % node_name)
+ def cmd_tag_dead(self, dead_node_name):
self.load_local_info()
- self.queue_info.tag_dead(node_name)
+
+ # tag node dead in memory
+ self.log.info("Tagging node '%s' as dead" % dead_node_name)
+ self.queue_info.tag_dead(dead_node_name)
+
+ # tag node dead in local node
q = "select * from pgq_node.register_location(%s, %s, null, true)"
- self.node_cmd(self.local_node, q, [self.queue_name, node_name])
- # fixme: root?
+ self.node_cmd(self.local_node, q, [self.queue_name, dead_node_name])
+
+ # tag node dead in other nodes
+ nodelist = self.queue_info.member_map.keys()
+ for node_name in nodelist:
+ if not self.node_alive(node_name):
+ continue
+ if node_name == dead_node_name:
+ continue
+ if node_name == self.local_node:
+ continue
+ try:
+ q = "select * from pgq_node.register_location(%s, %s, null, true)"
+ self.node_cmd(node_name, q, [self.queue_name, dead_node_name])
+ except DBError, d:
+ msg = str(d).strip().split('\n', 1)[0]
+ print('Node %s failure: %s' % (node_name, msg))
+ self.close_node_database(node_name)
def cmd_pause(self):
"""Pause a node"""