diff options
author | Marko Kreen | 2008-04-15 13:02:47 +0000 |
---|---|---|
committer | Marko Kreen | 2008-04-15 13:02:47 +0000 |
commit | 329cae98ec32cd9369cab54a3b597d50fd280562 (patch) | |
tree | 123d6cb60abbc445926956e77d2dc0318483e36e /python/pgq/setconsumer.py | |
parent | 3bb252361d1595e2b882c3fae9602d866a8c553b (diff) |
more setconsumer/londiste work.
simple init/event processing/copy seems to work.
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r-- | python/pgq/setconsumer.py | 160 |
1 files changed, 107 insertions, 53 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py index 6e01ccf6..3b87cbfd 100644 --- a/python/pgq/setconsumer.py +++ b/python/pgq/setconsumer.py @@ -20,8 +20,9 @@ class MemberInfo: self.dead = row['dead'] class NodeInfo: - def __init__(self, row, member_list): + def __init__(self, row, member_list, main_worker = True): self.member_map = {} + self.main_worker = main_worker for r in member_list: m = MemberInfo(r) self.member_map[m.name] = m @@ -43,6 +44,9 @@ class NodeInfo: self.worker_name = row['worker_name'] def need_action(self, action_name): + if not self.main_worker: + return action_name in ('process-batch', 'process-events') + typ = self.type if type == 'merge-leaf': if self.target_type == 'combined-branch': @@ -69,12 +73,13 @@ class NodeInfo: action_map = { 'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, -'process-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, -'copy-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, +'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, +'copy-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, 'tick-event': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, 'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, 'wait-behind': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1}, 'sync-part-pos': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, +'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, } node_properties = { @@ -83,84 +88,101 @@ node_properties = { } class SetConsumer(skytools.DBScript): - last_global_wm_event = 0 + last_local_wm_publish_time = 0 + last_global_wm_publish_time = 0 + main_worker = True + reg_ok = False + def __init__(self, service_name, args, + node_db_name = 'node_db'): + skytools.DBScript.__init__(self, service_name, args) + self.node_db_name = node_db_name + self.consumer_name = self.cf.get('consumer_name', self.job_name) + def work(self): self.tick_id_cache = {} self.set_name = self.cf.get('set_name') - target_db = self.get_database('subscriber_db') - - node = self.load_node_info(target_db) - self.consumer_name = node.worker_name + dst_db = self.get_database(self.node_db_name) + dst_curs = dst_db.cursor() - if not node.up_to_date: - self.tag_node_uptodate(target_db) + dst_node = self.load_node_info(dst_db) + if self.main_worker: + self.consumer_name = dst_node.worker_name + if not dst_node.up_to_date: + self.tag_node_uptodate(dst_db) - if node.paused: + if dst_node.paused: return 0 - if node.need_action('global-wm-event'): - curs = target_db.cursor() - self.set_global_watermark(curs, node.local_watermark) - target_db.commit() + if dst_node.need_action('global-wm-event'): + self.publish_global_watermark(dst_db, dst_node.local_watermark) - if not node.need_action('process-batch'): + if not dst_node.need_action('process-batch'): return 0 # # batch processing follows # - source_db = self.get_database('source_db', connstr = node.provider_location) - srcnode = self.load_node_info(source_db) + src_db = self.get_database('src_db', connstr = dst_node.provider_location) + src_curs = src_db.cursor() + src_node = self.load_node_info(src_db) # get batch - srcqueue = RawQueue(srcnode.queue_name, self.consumer_name) + src_queue = RawQueue(src_node.queue_name, self.consumer_name) + self.src_queue = src_queue + self.dst_queue = None + + if not self.main_worker and not self.reg_ok: + self.register_consumer(src_curs) - batch_id = srcqueue.next_batch(source_db.cursor()) - source_db.commit() + batch_id = src_queue.next_batch(src_curs) + src_db.commit() if batch_id is None: return 0 - if node.need_action('wait-behind'): - if node.should_wait(queue.cur_tick): + self.log.debug("New batch: tick_id=%d / batch_id=%d" % (src_queue.cur_tick, batch_id)) + + if dst_node.need_action('wait-behind'): + if dst_node.should_wait(src_queue.cur_tick): return 0 - if node.need_action('process-event'): + if dst_node.need_action('process-events'): # load and process batch data - ev_list = self.get_batch_events(source_db, batch_id) + ev_list = src_queue.get_batch_events(src_curs) - copy_queue = None - if node.need_action('copy-events'): - copy_queue = node.get_target_queue() - self.process_set_batch(target_db, ev_list, copy_queue) - if copy_queue: - copy_queue.finish_bulk_insert(curs) - self.copy_tick(target_curs, srcqueue, copy_queue) + if dst_node.need_action('copy-events'): + self.dst_queue = RawQueue(dst_node.get_target_queue(), self.consumer_name) + self.process_set_batch(src_db, dst_db, ev_list) + if self.dst_queue: + self.dst_queue.finish_bulk_insert(dst_curs) + self.copy_tick(dst_curs, src_queue, self.dst_queue) # COMBINED_BRANCH needs to sync with part sets - if node.need_action('sync-part-pos'): - self.move_part_positions(target_curs) + if dst_node.need_action('sync-part-pos'): + self.move_part_positions(dst_curs) # we are done on target - self.set_tick_complete(target_curs) - target_db.commit() + self.set_tick_complete(dst_curs, src_queue.cur_tick) + dst_db.commit() # done on source - self.finish_batch(source_db, batch_id) + src_queue.finish_batch(src_curs) + src_db.commit() # occasinally send watermark upwards - self.send_local_watermark_upwards(target_db, source_db) + if dst_node.need_action('local-wm-publish'): + self.send_local_watermark_upwards(src_db, dst_node) # got a batch so there can be more return 1 - def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None): + def process_set_batch(self, src_db, dst_db, ev_list): dst_curs = dst_db.cursor() for ev in ev_list: self.process_set_event(dst_curs, ev) - if copy_queue: - copy_queue.bulk_insert(dst_curs, ev) + if self.dst_queue: + self.dst_queue.bulk_insert(dst_curs, ev) self.stat_add('count', len(ev_list)) def process_set_event(self, dst_curs, ev): @@ -203,21 +225,35 @@ class SetConsumer(skytools.DBScript): q = "select * from pgq_set.add_member(%s, %s, %s, %s)" dst_curs.execute(q, [set_name, node_name, node_location, dead]) - def send_local_watermark_upwards(self, target_db, source_db): - target_curs = target_db.cursor() - source_curs = source_db.cursor() - q = "select pgq_ext.get_local_watermark(%s)" - target_curs.execute(q, [self.set_name]) - wm = target_curs.fetchone()[0] - target_db.commit() - - q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)" - source_curs.execute(q, [self.set_name]) + def send_local_watermark_upwards(self, src_db, node): + # fixme - delay + now = time.time() + delay = now - self.last_local_wm_publish_time + if delay < 1*60: + return + self.last_local_wm_publish_time = now + + self.log.debug("send_local_watermark_upwards") + src_curs = src_db.cursor() + q = "select pgq_set.set_subscriber_watermark(%s, %s, %s)" + src_curs.execute(q, [self.set_name, node.name, node.local_watermark]) + src_db.commit() def set_global_watermark(self, dst_curs, tick_id): + self.log.debug("set_global_watermark: %s" % tick_id) q = "select pgq_set.set_global_watermark(%s, %s)" dst_curs.execute(q, [self.set_name, tick_id]) + def publish_global_watermark(self, dst_db, watermark): + now = time.time() + delay = now - self.last_global_wm_publish_time + if delay < 1*60: + return + self.last_global_wm_publish_time = now + + self.set_global_watermark(dst_db.cursor(), watermark) + dst_db.commit() + def load_node_info(self, db): curs = db.cursor() @@ -232,10 +268,10 @@ class SetConsumer(skytools.DBScript): mbr_list = curs.dictfetchall() db.commit() - return NodeInfo(node_row, mbr_list) + return NodeInfo(node_row, mbr_list, self.main_worker) def tag_node_uptodate(self, dst_db): - dst_curs = db.cursor() + dst_curs = dst_db.cursor() q = "select * from pgq_set.set_node_uptodate(%s, true)" dst_curs.execute(q, [self.set_name]) dst_db.commit() @@ -244,6 +280,24 @@ class SetConsumer(skytools.DBScript): q = "select * from pgq.ticker(%s, %s)" dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick]) + def set_tick_complete(self, dst_curs, tick_id): + q = "select * from pgq_set.set_completed_tick(%s, %s, %s)" + dst_curs.execute(q, [self.set_name, self.consumer_name, tick_id]) + + def register_consumer(self, src_curs): + if self.main_worker: + raise Exception('main set worker should not play with registrations') + + q = "select * from pgq.register_consumer(%s, %s)" + src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name]) + + def unregister_consumer(self, src_curs): + if self.main_worker: + raise Exception('main set worker should not play with registrations') + + q = "select * from pgq.unregister_consumer(%s, %s)" + src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name]) + if __name__ == '__main__': script = SetConsumer('setconsumer', sys.argv[1:]) script.start() |