summaryrefslogtreecommitdiff
path: root/python/pgq/setconsumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r--python/pgq/setconsumer.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py
index 4ec754e7..d17ecf5f 100644
--- a/python/pgq/setconsumer.py
+++ b/python/pgq/setconsumer.py
@@ -12,6 +12,9 @@ class SetConsumer(skytools.DBScript):
last_global_wm_publish_time = 0
main_worker = True
reg_ok = False
+ actual_dst_event_id = 0
+ batch_max_event_id = 0
+ seq_buffer = 10000
def __init__(self, service_name, args,
node_db_name = 'node_db'):
skytools.DBScript.__init__(self, service_name, args)
@@ -81,6 +84,8 @@ class SetConsumer(skytools.DBScript):
# COMBINED_BRANCH needs to sync with part sets
if dst_node.need_action('sync-part-pos'):
self.move_part_positions(dst_curs)
+ if dst_node.need_action('update-event-seq'):
+ self.update_event_seq(dst_curs)
# we are done on target
self.set_tick_complete(dst_curs, src_queue.cur_tick)
@@ -99,12 +104,31 @@ class SetConsumer(skytools.DBScript):
def process_set_batch(self, src_db, dst_db, ev_list):
dst_curs = dst_db.cursor()
+ max_id = 0
for ev in ev_list:
self.process_set_event(dst_curs, ev)
if self.dst_queue:
self.dst_queue.bulk_insert(dst_curs, ev)
+ if ev.id > max_id:
+ max_id = ev.id
+ self.batch_max_event_id = max_id
self.stat_increase('count', len(ev_list))
+ def update_event_seq(self, dst_curs):
+ qname = self.dst_queue.queue_name
+ if self.actual_dst_event_id == 0:
+ q = "select pgq.seq_getval(queue_event_seq) from pgq.queue where queue_name = %s"
+ dst_curs.execute(q, [qname])
+ self.actual_dst_event_id = dst_curs.fetchone()[0]
+ self.log.debug('got local event_id value = %d' % self.actual_dst_event_id)
+
+ if self.batch_max_event_id + self.seq_buffer >= self.actual_dst_event_id:
+ next_id = self.batch_max_event_id + 2 * self.seq_buffer
+ q = "select pgq.seq_setval(queue_event_seq, %s) from pgq.queue where queue_name = %s"
+ self.log.debug('set local event_id value = %d' % next_id)
+ dst_curs.execute(q, [next_id, qname])
+ self.actual_dst_event_id = next_id
+
def process_set_event(self, dst_curs, ev):
if ev.type == 'set-tick':
self.handle_set_tick(dst_curs, ev)