summaryrefslogtreecommitdiff
path: root/python/pgq/setinfo.py
diff options
context:
space:
mode:
authorMarko Kreen2008-04-25 14:48:32 +0000
committerMarko Kreen2008-04-25 14:48:32 +0000
commit7ad93d5ca136a10c970a38837641b5789f3f8033 (patch)
tree93c4be1e4089e99c7cb735b40918f6c369b04aa3 /python/pgq/setinfo.py
parenta6bc0ffa2c37b9e43a5bca7b4003313ce1875ad3 (diff)
pgq_set: pause/resume/change-provider/rename-node work now
Diffstat (limited to 'python/pgq/setinfo.py')
-rw-r--r--python/pgq/setinfo.py63
1 files changed, 40 insertions, 23 deletions
diff --git a/python/pgq/setinfo.py b/python/pgq/setinfo.py
index da2e1ba9..17ff781c 100644
--- a/python/pgq/setinfo.py
+++ b/python/pgq/setinfo.py
@@ -1,11 +1,10 @@
#! /usr/bin/env python
-import skytools
-
__all__ = ['MemberInfo', 'NodeInfo', 'SetInfo',
'ROOT', 'BRANCH', 'LEAF', 'COMBINED_ROOT',
'COMBINED_BRANCH', 'MERGE_LEAF']
+# node types
ROOT = 'root'
BRANCH = 'branch'
LEAF = 'leaf'
@@ -13,6 +12,7 @@ COMBINED_ROOT = 'combined-root'
COMBINED_BRANCH = 'combined-branch'
MERGE_LEAF = 'merge-leaf'
+# which nodes need to do what actions
action_map = {
'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
@@ -46,7 +46,7 @@ class NodeInfo:
self.provider_location = row['provider_location']
self.paused = row['paused']
self.resync = row['resync']
- self.up_to_date = row['up_to_date']
+ self.uptodate = row['uptodate']
self.combined_set = row['combined_set']
self.combined_type = row['combined_type']
self.combined_queue = row['combined_queue']
@@ -56,15 +56,18 @@ class NodeInfo:
self._info_lines = []
def need_action(self, action_name):
+ """Returns True if worker for this node needs
+ to do specified action.
+ """
if not self.main_worker:
return action_name in ('process-batch', 'process-events')
typ = self.type
- if type == 'merge-leaf':
- if self.target_type == 'combined-branch':
- typ += "merge-leaf-to-branch"
- elif self.target_type == 'combined-root':
- typ += "merge-leaf-to-root"
+ if type == MERGE_LEAF:
+ if self.target_type == COMBINED_BRANCH:
+ typ = "merge-leaf-to-branch"
+ elif self.target_type == COMBINED_ROOT:
+ typ = "merge-leaf-to-root"
else:
raise Exception('bad target type')
@@ -94,7 +97,12 @@ class NodeInfo:
lag = root_time - tick_time
else:
lag = self.queue_info['ticker_lag']
- lst.append("lag: %s" % lag)
+ txt = "lag: %s" % lag
+ if self.paused:
+ txt += ", PAUSED"
+ if not self.uptodate:
+ txt += ", NOT UPTODATE"
+ lst.append(txt)
return lst
def add_info_line(self, ln):
@@ -119,13 +127,11 @@ class NodeInfo:
class SetInfo:
def __init__(self, set_name, info_row, member_rows):
- self.root_info = info_row
+ self.local_node = NodeInfo(set_name, info_row)
self.set_name = set_name
self.member_map = {}
self.node_map = {}
- self.root_name = info_row['node_name']
- self.root_type = info_row['node_type']
- self.global_watermark = info_row['global_watermark']
+ self.add_node(self.local_node)
for r in member_rows:
n = MemberInfo(r)
@@ -137,22 +143,23 @@ class SetInfo:
def get_node(self, name):
return self.node_map.get(name)
- def get_root_node(self):
- return self.get_node(self.root_name)
-
def add_node(self, node):
self.node_map[node.name] = node
_DATAFMT = "%-30s%s"
def print_tree(self):
- self._prepare_tree()
- root = self.get_root_node()
+ """Print ascii-tree for set.
+ Expects that data for all nodes is filled in."""
+
+ root = self._prepare_tree()
self._tree_calc(root)
datalines = self._print_node(root, '', [])
for ln in datalines:
print self._DATAFMT % (' ', ln)
def _print_node(self, node, pfx, datalines):
+ # print a tree fragment for node and info
+ # returns list of unprinted data rows
for ln in datalines:
print self._DATAFMT % (_setpfx(pfx, '|'), ln)
datalines = node.get_infolines()
@@ -160,16 +167,20 @@ class SetInfo:
for i, n in enumerate(node.child_list):
sfx = ((i < len(node.child_list) - 1) and ' |' or ' ')
- tmppfx = pfx + sfx
- datalines = self._print_node(n, tmppfx, datalines)
+ datalines = self._print_node(n, pfx + sfx, datalines)
return datalines
def _prepare_tree(self):
+ # reset vars, fill parent and child_list for each node
+ # returns root
+ root = None
for node in self.node_map.itervalues():
node.total_childs = 0
node.levels = 0
node.child_list = []
+ if node.type in (ROOT, COMBINED_ROOT):
+ root = node
for node in self.node_map.itervalues():
if node.provider_node:
p = self.node_map[node.provider_node]
@@ -178,7 +189,13 @@ class SetInfo:
else:
node.parent = None
+ if root is None:
+ raise Exception("root nod enot found")
+ return root
+
def _tree_calc(self, node):
+ # calculate levels and count total childs
+ # sort the tree based on them
total = len(node.child_list)
levels = 1
for subnode in node.child_list:
@@ -199,7 +216,7 @@ def _setpfx(pfx, sfx):
def _cmp_node(n1, n2):
# returns neg if n1 smaller
cmp = n1.levels - n2.levels
- if cmp:
- return cmp
- return n1.total_childs - n2.total_childs
+ if cmp == 0:
+ cmp = n1.total_childs - n2.total_childs
+ return cmp