summaryrefslogtreecommitdiff
path: root/python/pgq/setconsumer.py
diff options
context:
space:
mode:
authorMarko Kreen2008-04-22 12:46:13 +0000
committerMarko Kreen2008-04-22 12:46:13 +0000
commitb39310d82f8203ce02280043744a500ca68b6e7a (patch)
tree8d85313017dc8f7d762e1b6dd62da4b5c9357b61 /python/pgq/setconsumer.py
parent0f571a13fed0fe8adffba9ee545b9add46075f34 (diff)
more pgq_set/londiste cleanup
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r--python/pgq/setconsumer.py84
1 files changed, 2 insertions, 82 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py
index 76b8dbe6..268cd7a8 100644
--- a/python/pgq/setconsumer.py
+++ b/python/pgq/setconsumer.py
@@ -3,90 +3,10 @@
import sys, time, skytools
from pgq.rawconsumer import RawQueue
+from pgq.setinfo import *
__all__ = ['SetConsumer']
-ROOT = 'root'
-BRANCH = 'branch'
-LEAF = 'leaf'
-COMBINED_ROOT = 'combined-root'
-COMBINED_BRANCH = 'combined-branch'
-MERGE_LEAF = 'merge-leaf'
-
-class MemberInfo:
- def __init__(self, row):
- self.name = row['node_name']
- self.location = row['node_location']
- self.dead = row['dead']
-
-class NodeInfo:
- def __init__(self, row, member_list, main_worker = True):
- self.member_map = {}
- self.main_worker = main_worker
- for r in member_list:
- m = MemberInfo(r)
- self.member_map[m.name] = m
-
- self.name = row['node_name']
- self.type = row['node_type']
- self.queue_name = row['queue_name']
- self.global_watermark = row['global_watermark']
- self.local_watermark = row['local_watermark']
- self.completed_tick = row['completed_tick']
- self.provider_node = row['provider_node']
- self.provider_location = row['provider_location']
- self.paused = row['paused']
- self.resync = row['resync']
- self.up_to_date = row['up_to_date']
- self.combined_set = row['combined_set']
- self.combined_type = row['combined_type']
- self.combined_queue = row['combined_queue']
- self.worker_name = row['worker_name']
-
- def need_action(self, action_name):
- if not self.main_worker:
- return action_name in ('process-batch', 'process-events')
-
- typ = self.type
- if type == 'merge-leaf':
- if self.target_type == 'combined-branch':
- typ += "merge-leaf-to-branch"
- elif self.target_type == 'combined-root':
- typ += "merge-leaf-to-root"
- else:
- raise Exception('bad target type')
-
- try:
- return action_map[action_name][typ]
- except KeyError, d:
- raise Exception('need_action(name=%s, type=%s) unknown' % (action_name, typ))
-
- def get_target_queue(self):
- qname = None
- if self.type == 'merge-leaf':
- qname = self.combined_queue
- else:
- qname = self.queue_name
- if qname is None:
- raise Exception("no target queue")
- return qname
-
-action_map = {
-'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'copy-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'tick-event': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'wait-behind': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
-'sync-part-pos': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
-'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-}
-
-node_properties = {
-'pgq': {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':1},
-'queue': {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':0},
-}
-
class SetConsumer(skytools.DBScript):
last_local_wm_publish_time = 0
last_global_wm_publish_time = 0
@@ -268,7 +188,7 @@ class SetConsumer(skytools.DBScript):
mbr_list = curs.dictfetchall()
db.commit()
- return NodeInfo(node_row, mbr_list, self.main_worker)
+ return NodeInfo(node_row, self.main_worker)
def tag_node_uptodate(self, dst_db):
dst_curs = dst_db.cursor()