summaryrefslogtreecommitdiff
path: root/python/pgq/setconsumer.py
diff options
context:
space:
mode:
authorMarko Kreen2008-04-15 13:02:47 +0000
committerMarko Kreen2008-04-15 13:02:47 +0000
commit329cae98ec32cd9369cab54a3b597d50fd280562 (patch)
tree123d6cb60abbc445926956e77d2dc0318483e36e /python/pgq/setconsumer.py
parent3bb252361d1595e2b882c3fae9602d866a8c553b (diff)
more setconsumer/londiste work.
simple init/event processing/copy seems to work.
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r--python/pgq/setconsumer.py160
1 files changed, 107 insertions, 53 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py
index 6e01ccf6..3b87cbfd 100644
--- a/python/pgq/setconsumer.py
+++ b/python/pgq/setconsumer.py
@@ -20,8 +20,9 @@ class MemberInfo:
self.dead = row['dead']
class NodeInfo:
- def __init__(self, row, member_list):
+ def __init__(self, row, member_list, main_worker = True):
self.member_map = {}
+ self.main_worker = main_worker
for r in member_list:
m = MemberInfo(r)
self.member_map[m.name] = m
@@ -43,6 +44,9 @@ class NodeInfo:
self.worker_name = row['worker_name']
def need_action(self, action_name):
+ if not self.main_worker:
+ return action_name in ('process-batch', 'process-events')
+
typ = self.type
if type == 'merge-leaf':
if self.target_type == 'combined-branch':
@@ -69,12 +73,13 @@ class NodeInfo:
action_map = {
'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
-'process-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
-'copy-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
+'copy-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
'tick-event': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0},
'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
'wait-behind': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1},
'sync-part-pos': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0},
+'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1},
}
node_properties = {
@@ -83,84 +88,101 @@ node_properties = {
}
class SetConsumer(skytools.DBScript):
- last_global_wm_event = 0
+ last_local_wm_publish_time = 0
+ last_global_wm_publish_time = 0
+ main_worker = True
+ reg_ok = False
+ def __init__(self, service_name, args,
+ node_db_name = 'node_db'):
+ skytools.DBScript.__init__(self, service_name, args)
+ self.node_db_name = node_db_name
+ self.consumer_name = self.cf.get('consumer_name', self.job_name)
+
def work(self):
self.tick_id_cache = {}
self.set_name = self.cf.get('set_name')
- target_db = self.get_database('subscriber_db')
-
- node = self.load_node_info(target_db)
- self.consumer_name = node.worker_name
+ dst_db = self.get_database(self.node_db_name)
+ dst_curs = dst_db.cursor()
- if not node.up_to_date:
- self.tag_node_uptodate(target_db)
+ dst_node = self.load_node_info(dst_db)
+ if self.main_worker:
+ self.consumer_name = dst_node.worker_name
+ if not dst_node.up_to_date:
+ self.tag_node_uptodate(dst_db)
- if node.paused:
+ if dst_node.paused:
return 0
- if node.need_action('global-wm-event'):
- curs = target_db.cursor()
- self.set_global_watermark(curs, node.local_watermark)
- target_db.commit()
+ if dst_node.need_action('global-wm-event'):
+ self.publish_global_watermark(dst_db, dst_node.local_watermark)
- if not node.need_action('process-batch'):
+ if not dst_node.need_action('process-batch'):
return 0
#
# batch processing follows
#
- source_db = self.get_database('source_db', connstr = node.provider_location)
- srcnode = self.load_node_info(source_db)
+ src_db = self.get_database('src_db', connstr = dst_node.provider_location)
+ src_curs = src_db.cursor()
+ src_node = self.load_node_info(src_db)
# get batch
- srcqueue = RawQueue(srcnode.queue_name, self.consumer_name)
+ src_queue = RawQueue(src_node.queue_name, self.consumer_name)
+ self.src_queue = src_queue
+ self.dst_queue = None
+
+ if not self.main_worker and not self.reg_ok:
+ self.register_consumer(src_curs)
- batch_id = srcqueue.next_batch(source_db.cursor())
- source_db.commit()
+ batch_id = src_queue.next_batch(src_curs)
+ src_db.commit()
if batch_id is None:
return 0
- if node.need_action('wait-behind'):
- if node.should_wait(queue.cur_tick):
+ self.log.debug("New batch: tick_id=%d / batch_id=%d" % (src_queue.cur_tick, batch_id))
+
+ if dst_node.need_action('wait-behind'):
+ if dst_node.should_wait(src_queue.cur_tick):
return 0
- if node.need_action('process-event'):
+ if dst_node.need_action('process-events'):
# load and process batch data
- ev_list = self.get_batch_events(source_db, batch_id)
+ ev_list = src_queue.get_batch_events(src_curs)
- copy_queue = None
- if node.need_action('copy-events'):
- copy_queue = node.get_target_queue()
- self.process_set_batch(target_db, ev_list, copy_queue)
- if copy_queue:
- copy_queue.finish_bulk_insert(curs)
- self.copy_tick(target_curs, srcqueue, copy_queue)
+ if dst_node.need_action('copy-events'):
+ self.dst_queue = RawQueue(dst_node.get_target_queue(), self.consumer_name)
+ self.process_set_batch(src_db, dst_db, ev_list)
+ if self.dst_queue:
+ self.dst_queue.finish_bulk_insert(dst_curs)
+ self.copy_tick(dst_curs, src_queue, self.dst_queue)
# COMBINED_BRANCH needs to sync with part sets
- if node.need_action('sync-part-pos'):
- self.move_part_positions(target_curs)
+ if dst_node.need_action('sync-part-pos'):
+ self.move_part_positions(dst_curs)
# we are done on target
- self.set_tick_complete(target_curs)
- target_db.commit()
+ self.set_tick_complete(dst_curs, src_queue.cur_tick)
+ dst_db.commit()
# done on source
- self.finish_batch(source_db, batch_id)
+ src_queue.finish_batch(src_curs)
+ src_db.commit()
# occasinally send watermark upwards
- self.send_local_watermark_upwards(target_db, source_db)
+ if dst_node.need_action('local-wm-publish'):
+ self.send_local_watermark_upwards(src_db, dst_node)
# got a batch so there can be more
return 1
- def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None):
+ def process_set_batch(self, src_db, dst_db, ev_list):
dst_curs = dst_db.cursor()
for ev in ev_list:
self.process_set_event(dst_curs, ev)
- if copy_queue:
- copy_queue.bulk_insert(dst_curs, ev)
+ if self.dst_queue:
+ self.dst_queue.bulk_insert(dst_curs, ev)
self.stat_add('count', len(ev_list))
def process_set_event(self, dst_curs, ev):
@@ -203,21 +225,35 @@ class SetConsumer(skytools.DBScript):
q = "select * from pgq_set.add_member(%s, %s, %s, %s)"
dst_curs.execute(q, [set_name, node_name, node_location, dead])
- def send_local_watermark_upwards(self, target_db, source_db):
- target_curs = target_db.cursor()
- source_curs = source_db.cursor()
- q = "select pgq_ext.get_local_watermark(%s)"
- target_curs.execute(q, [self.set_name])
- wm = target_curs.fetchone()[0]
- target_db.commit()
-
- q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)"
- source_curs.execute(q, [self.set_name])
+ def send_local_watermark_upwards(self, src_db, node):
+ # fixme - delay
+ now = time.time()
+ delay = now - self.last_local_wm_publish_time
+ if delay < 1*60:
+ return
+ self.last_local_wm_publish_time = now
+
+ self.log.debug("send_local_watermark_upwards")
+ src_curs = src_db.cursor()
+ q = "select pgq_set.set_subscriber_watermark(%s, %s, %s)"
+ src_curs.execute(q, [self.set_name, node.name, node.local_watermark])
+ src_db.commit()
def set_global_watermark(self, dst_curs, tick_id):
+ self.log.debug("set_global_watermark: %s" % tick_id)
q = "select pgq_set.set_global_watermark(%s, %s)"
dst_curs.execute(q, [self.set_name, tick_id])
+ def publish_global_watermark(self, dst_db, watermark):
+ now = time.time()
+ delay = now - self.last_global_wm_publish_time
+ if delay < 1*60:
+ return
+ self.last_global_wm_publish_time = now
+
+ self.set_global_watermark(dst_db.cursor(), watermark)
+ dst_db.commit()
+
def load_node_info(self, db):
curs = db.cursor()
@@ -232,10 +268,10 @@ class SetConsumer(skytools.DBScript):
mbr_list = curs.dictfetchall()
db.commit()
- return NodeInfo(node_row, mbr_list)
+ return NodeInfo(node_row, mbr_list, self.main_worker)
def tag_node_uptodate(self, dst_db):
- dst_curs = db.cursor()
+ dst_curs = dst_db.cursor()
q = "select * from pgq_set.set_node_uptodate(%s, true)"
dst_curs.execute(q, [self.set_name])
dst_db.commit()
@@ -244,6 +280,24 @@ class SetConsumer(skytools.DBScript):
q = "select * from pgq.ticker(%s, %s)"
dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick])
+ def set_tick_complete(self, dst_curs, tick_id):
+ q = "select * from pgq_set.set_completed_tick(%s, %s, %s)"
+ dst_curs.execute(q, [self.set_name, self.consumer_name, tick_id])
+
+ def register_consumer(self, src_curs):
+ if self.main_worker:
+ raise Exception('main set worker should not play with registrations')
+
+ q = "select * from pgq.register_consumer(%s, %s)"
+ src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
+
+ def unregister_consumer(self, src_curs):
+ if self.main_worker:
+ raise Exception('main set worker should not play with registrations')
+
+ q = "select * from pgq.unregister_consumer(%s, %s)"
+ src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name])
+
if __name__ == '__main__':
script = SetConsumer('setconsumer', sys.argv[1:])
script.start()