summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/worker.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index beb13dd5..711f3fc8 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -30,6 +30,8 @@ class WorkerState:
keep_event_ids = 0 # ok
create_tick = 0 # ok
filtered_copy = 0 # ok
+ process_global_wm = 0 # ok
+
def __init__(self, queue_name, nst):
self.node_type = nst['node_type']
self.node_name = nst['node_name']
@@ -47,6 +49,7 @@ class WorkerState:
self.process_tick_event = 1
self.keep_event_ids = 1
self.create_tick = 1
+ self.process_global_wm = 1
elif ntype == 'leaf' and not ctype:
self.process_batch = 1
self.process_events = 1
@@ -228,6 +231,7 @@ class CascadedWorker(CascadedConsumer):
raise Exception("bad event in queue: "+str(ev))
self.log.info("got cascade event: %s" % t)
+ st = self._worker_state
if t == "pgq.location-info":
node = ev.ev_data
loc = ev.ev_extra2
@@ -235,14 +239,14 @@ class CascadedWorker(CascadedConsumer):
q = "select * from pgq_node.register_location(%s, %s, %s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead])
elif t == "pgq.global-watermark":
- tick_id = int(ev.ev_data)
- q = "select * from pgq_node.set_global_watermark(%s, %s)"
- dst_curs.execute(q, [self.pgq_queue_name, tick_id])
+ if st.process_global_wm:
+ tick_id = int(ev.ev_data)
+ q = "select * from pgq_node.set_global_watermark(%s, %s)"
+ dst_curs.execute(q, [self.pgq_queue_name, tick_id])
elif t == "pgq.tick-id":
tick_id = int(ev.ev_data)
if ev.ev_extra1 == self.pgq_queue_name:
raise Exception('tick-id event for own queue?')
- st = self._worker_state
if st.process_tick_event:
q = "select * from pgq_node.set_partition_watermark(%s, %s, %s)"
dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id])