diff options
Diffstat (limited to 'python/pgq/consumer.py')
| -rw-r--r-- | python/pgq/consumer.py | 40 |
1 files changed, 8 insertions, 32 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 84375f4c..777d1f8b 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -298,17 +298,16 @@ class Consumer(skytools.DBScript): def _load_next_batch(self, curs): """Allocate next batch. (internal)""" - q = """select batch_id, - prev_tick_id, - cur_tick_id as tick_id, - cur_tick_time as batch_end, - prev_tick_time as batch_start, - prev_tick_event_seq as seq_start, - cur_tick_event_seq as seq_end - from pgq.next_batch_custom(%s, %s, %s, %s, %s)""" + q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)""" curs.execute(q, [self.queue_name, self.consumer_name, self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval]) - self.batch_info = curs.fetchone() + inf = curs.fetchone().copy() + inf['tick_id'] = inf['cur_tick_id'] + inf['batch_end'] = inf['cur_tick_time'] + inf['batch_start'] = inf['prev_tick_time'] + inf['seq_start'] = inf['prev_tick_event_seq'] + inf['seq_end'] = inf['cur_tick_event_seq'] + self.batch_info = inf return self.batch_info['batch_id'] def _flush_retry(self, curs, batch_id, list): @@ -347,29 +346,6 @@ class Consumer(skytools.DBScript): cx.execute("select pgq.event_retry(%s, %s, %s)", [batch_id, ev_id, retry_time]) - def get_batch_info(self, batch_id): - """Get info about batch. - - @return: Return value is a dict of: - - - queue_name: queue name - - consumer_name: consumers name - - batch_start: batch start time - - batch_end: batch end time - - tick_id: end tick id - - prev_tick_id: start tick id - - lag: how far is batch_end from current moment. - """ - db = self.get_database(self.db_name) - cx = db.cursor() - q = "select queue_name, consumer_name, batch_start, batch_end,"\ - " prev_tick_id, tick_id, lag"\ - " from pgq.get_batch_info(%s)" - cx.execute(q, [batch_id]) - row = cx.dictfetchone() - db.commit() - return row - def stat_start(self): self.stat_batch_start = time.time() |
