summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/consumer.py1
-rw-r--r--python/pgq/cascade/worker.py3
2 files changed, 3 insertions, 1 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index 97b90c24..a3b0fc7c 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -126,7 +126,6 @@ class CascadedConsumer(Consumer):
self._consumer_state = self.refresh_state(dst_db)
if self._consumer_state['node_type'] == 'root':
- self.log.info("target is root")
self.process_root_node(dst_db)
return
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index 79ba5479..5e17e358 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -261,6 +261,9 @@ class CascadedWorker(CascadedConsumer):
def process_root_node(self, dst_db):
"""On root node send global watermark downstream.
"""
+
+ CascadedConsumer.process_root_node(self, dst_db)
+
t = time.time()
if t - self.global_wm_publish_time < self.global_wm_publish_period:
return