diff options
Diffstat (limited to 'python/pgq')
| -rw-r--r-- | python/pgq/cascade/consumer.py | 7 | ||||
| -rw-r--r-- | python/pgq/cascade/worker.py | 4 |
2 files changed, 4 insertions, 7 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index 10866620..533f8655 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -18,7 +18,6 @@ class CascadedConsumer(Consumer): Loads provider from target node, accepts pause/resume commands. """ - _batch_info = None _consumer_state = None def __init__(self, service_name, db_name, args): @@ -156,15 +155,13 @@ class CascadedConsumer(Consumer): dst_db.commit() def process_batch(self, src_db, batch_id, event_list): - self._batch_info = self.get_batch_info(batch_id) - state = self._consumer_state - if self.is_batch_done(state, self._batch_info): + if self.is_batch_done(state, self.batch_info): return dst_db = self.get_database(self.target_db) - tick_id = self._batch_info['tick_id'] + tick_id = self.batch_info['tick_id'] self.process_remote_batch(src_db, tick_id, event_list, dst_db) # this also commits diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index b21c8c25..af4f3e21 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -239,8 +239,8 @@ class CascadedWorker(CascadedConsumer): dst_curs.execute(q, [st.target_queue, str(tick_id), self.pgq_queue_name]) if st.create_tick: # create actual tick - tick_id = self._batch_info['tick_id'] - tick_time = self._batch_info['batch_end'] + tick_id = self.batch_info['tick_id'] + tick_time = self.batch_info['batch_end'] q = "select pgq.ticker(%s, %s, %s, %s)" dst_curs.execute(q, [self.pgq_queue_name, tick_id, tick_time, self.cur_max_id]) |
