summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/worker.py10
1 files changed, 9 insertions, 1 deletions
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index af4f3e21..ccfbb7aa 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -149,8 +149,10 @@ class CascadedWorker(CascadedConsumer):
cst = self._consumer_state
if cst['completed_tick'] >= tick_id:
return
- time.sleep(10 * self.loop_delay)
+ self.sleep(10 * self.loop_delay)
self._consumer_state = self.refresh_state(dst_db)
+ if not self.looping:
+ sys.exit(0)
def is_batch_done(self, state, batch_info):
wst = self._worker_state
@@ -227,6 +229,12 @@ class CascadedWorker(CascadedConsumer):
"""Worker-specific cleanup on target node.
"""
+ # merge-leaf on branch should not update tick pos
+ wst = self._worker_state
+ if wst.wait_behind:
+ dst_db.commit()
+ return
+
if self.main_worker:
st = self._worker_state
dst_curs = dst_db.cursor()