diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pgq/cascade/consumer.py | 1 | ||||
-rw-r--r-- | python/pgq/cascade/worker.py | 3 |
2 files changed, 3 insertions, 1 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index 97b90c24..a3b0fc7c 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -126,7 +126,6 @@ class CascadedConsumer(Consumer): self._consumer_state = self.refresh_state(dst_db) if self._consumer_state['node_type'] == 'root': - self.log.info("target is root") self.process_root_node(dst_db) return diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index 79ba5479..5e17e358 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -261,6 +261,9 @@ class CascadedWorker(CascadedConsumer): def process_root_node(self, dst_db): """On root node send global watermark downstream. """ + + CascadedConsumer.process_root_node(self, dst_db) + t = time.time() if t - self.global_wm_publish_time < self.global_wm_publish_period: return |