diff options
Diffstat (limited to 'python/pgq/setconsumer.py')
-rw-r--r-- | python/pgq/setconsumer.py | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py index 4ec754e7..d17ecf5f 100644 --- a/python/pgq/setconsumer.py +++ b/python/pgq/setconsumer.py @@ -12,6 +12,9 @@ class SetConsumer(skytools.DBScript): last_global_wm_publish_time = 0 main_worker = True reg_ok = False + actual_dst_event_id = 0 + batch_max_event_id = 0 + seq_buffer = 10000 def __init__(self, service_name, args, node_db_name = 'node_db'): skytools.DBScript.__init__(self, service_name, args) @@ -81,6 +84,8 @@ class SetConsumer(skytools.DBScript): # COMBINED_BRANCH needs to sync with part sets if dst_node.need_action('sync-part-pos'): self.move_part_positions(dst_curs) + if dst_node.need_action('update-event-seq'): + self.update_event_seq(dst_curs) # we are done on target self.set_tick_complete(dst_curs, src_queue.cur_tick) @@ -99,12 +104,31 @@ class SetConsumer(skytools.DBScript): def process_set_batch(self, src_db, dst_db, ev_list): dst_curs = dst_db.cursor() + max_id = 0 for ev in ev_list: self.process_set_event(dst_curs, ev) if self.dst_queue: self.dst_queue.bulk_insert(dst_curs, ev) + if ev.id > max_id: + max_id = ev.id + self.batch_max_event_id = max_id self.stat_increase('count', len(ev_list)) + def update_event_seq(self, dst_curs): + qname = self.dst_queue.queue_name + if self.actual_dst_event_id == 0: + q = "select pgq.seq_getval(queue_event_seq) from pgq.queue where queue_name = %s" + dst_curs.execute(q, [qname]) + self.actual_dst_event_id = dst_curs.fetchone()[0] + self.log.debug('got local event_id value = %d' % self.actual_dst_event_id) + + if self.batch_max_event_id + self.seq_buffer >= self.actual_dst_event_id: + next_id = self.batch_max_event_id + 2 * self.seq_buffer + q = "select pgq.seq_setval(queue_event_seq, %s) from pgq.queue where queue_name = %s" + self.log.debug('set local event_id value = %d' % next_id) + dst_curs.execute(q, [next_id, qname]) + self.actual_dst_event_id = next_id + def process_set_event(self, dst_curs, ev): if ev.type == 'set-tick': self.handle_set_tick(dst_curs, ev) |