diff options
author | Marko Kreen | 2008-04-22 12:46:13 +0000 |
---|---|---|
committer | Marko Kreen | 2008-04-22 12:46:13 +0000 |
commit | b39310d82f8203ce02280043744a500ca68b6e7a (patch) | |
tree | 8d85313017dc8f7d762e1b6dd62da4b5c9357b61 /python/pgq/setconsumer.py | |
parent | 0f571a13fed0fe8adffba9ee545b9add46075f34 (diff) |
more pgq_set/londiste cleanup
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r-- | python/pgq/setconsumer.py | 84 |
1 files changed, 2 insertions, 82 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py index 76b8dbe6..268cd7a8 100644 --- a/python/pgq/setconsumer.py +++ b/python/pgq/setconsumer.py @@ -3,90 +3,10 @@ import sys, time, skytools from pgq.rawconsumer import RawQueue +from pgq.setinfo import * __all__ = ['SetConsumer'] -ROOT = 'root' -BRANCH = 'branch' -LEAF = 'leaf' -COMBINED_ROOT = 'combined-root' -COMBINED_BRANCH = 'combined-branch' -MERGE_LEAF = 'merge-leaf' - -class MemberInfo: - def __init__(self, row): - self.name = row['node_name'] - self.location = row['node_location'] - self.dead = row['dead'] - -class NodeInfo: - def __init__(self, row, member_list, main_worker = True): - self.member_map = {} - self.main_worker = main_worker - for r in member_list: - m = MemberInfo(r) - self.member_map[m.name] = m - - self.name = row['node_name'] - self.type = row['node_type'] - self.queue_name = row['queue_name'] - self.global_watermark = row['global_watermark'] - self.local_watermark = row['local_watermark'] - self.completed_tick = row['completed_tick'] - self.provider_node = row['provider_node'] - self.provider_location = row['provider_location'] - self.paused = row['paused'] - self.resync = row['resync'] - self.up_to_date = row['up_to_date'] - self.combined_set = row['combined_set'] - self.combined_type = row['combined_type'] - self.combined_queue = row['combined_queue'] - self.worker_name = row['worker_name'] - - def need_action(self, action_name): - if not self.main_worker: - return action_name in ('process-batch', 'process-events') - - typ = self.type - if type == 'merge-leaf': - if self.target_type == 'combined-branch': - typ += "merge-leaf-to-branch" - elif self.target_type == 'combined-root': - typ += "merge-leaf-to-root" - else: - raise Exception('bad target type') - - try: - return action_map[action_name][typ] - except KeyError, d: - raise Exception('need_action(name=%s, type=%s) unknown' % (action_name, typ)) - - def get_target_queue(self): - qname = None - if self.type == 'merge-leaf': - qname = self.combined_queue - else: - qname = self.queue_name - if qname is None: - raise Exception("no target queue") - return qname - -action_map = { -'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, -'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, -'copy-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, -'tick-event': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, -'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, -'wait-behind': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1}, -'sync-part-pos': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, -'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, -} - -node_properties = { -'pgq': {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':1}, -'queue': {'root':1, 'branch':1, 'leaf':0, 'combined-root':1, 'combined-branch':1, 'merge-leaf':0}, -} - class SetConsumer(skytools.DBScript): last_local_wm_publish_time = 0 last_global_wm_publish_time = 0 @@ -268,7 +188,7 @@ class SetConsumer(skytools.DBScript): mbr_list = curs.dictfetchall() db.commit() - return NodeInfo(node_row, mbr_list, self.main_worker) + return NodeInfo(node_row, self.main_worker) def tag_node_uptodate(self, dst_db): dst_curs = dst_db.cursor() |