summaryrefslogtreecommitdiff
path: root/python/pgq
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq')
-rw-r--r--python/pgq/cascade/consumer.py7
-rw-r--r--python/pgq/cascade/worker.py4
2 files changed, 4 insertions, 7 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index 10866620..533f8655 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -18,7 +18,6 @@ class CascadedConsumer(Consumer):
Loads provider from target node, accepts pause/resume commands.
"""
- _batch_info = None
_consumer_state = None
def __init__(self, service_name, db_name, args):
@@ -156,15 +155,13 @@ class CascadedConsumer(Consumer):
dst_db.commit()
def process_batch(self, src_db, batch_id, event_list):
- self._batch_info = self.get_batch_info(batch_id)
-
state = self._consumer_state
- if self.is_batch_done(state, self._batch_info):
+ if self.is_batch_done(state, self.batch_info):
return
dst_db = self.get_database(self.target_db)
- tick_id = self._batch_info['tick_id']
+ tick_id = self.batch_info['tick_id']
self.process_remote_batch(src_db, tick_id, event_list, dst_db)
# this also commits
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index b21c8c25..af4f3e21 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -239,8 +239,8 @@ class CascadedWorker(CascadedConsumer):
dst_curs.execute(q, [st.target_queue, str(tick_id), self.pgq_queue_name])
if st.create_tick:
# create actual tick
- tick_id = self._batch_info['tick_id']
- tick_time = self._batch_info['batch_end']
+ tick_id = self.batch_info['tick_id']
+ tick_time = self.batch_info['batch_end']
q = "select pgq.ticker(%s, %s, %s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, tick_id, tick_time, self.cur_max_id])