diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/pgq/cascade/worker.py | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index af4f3e21..ccfbb7aa 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -149,8 +149,10 @@ class CascadedWorker(CascadedConsumer): cst = self._consumer_state if cst['completed_tick'] >= tick_id: return - time.sleep(10 * self.loop_delay) + self.sleep(10 * self.loop_delay) self._consumer_state = self.refresh_state(dst_db) + if not self.looping: + sys.exit(0) def is_batch_done(self, state, batch_info): wst = self._worker_state @@ -227,6 +229,12 @@ class CascadedWorker(CascadedConsumer): """Worker-specific cleanup on target node. """ + # merge-leaf on branch should not update tick pos + wst = self._worker_state + if wst.wait_behind: + dst_db.commit() + return + if self.main_worker: st = self._worker_state dst_curs = dst_db.cursor() |
