diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pgq/cascade/worker.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index beb13dd5..711f3fc8 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -30,6 +30,8 @@ class WorkerState: keep_event_ids = 0 # ok create_tick = 0 # ok filtered_copy = 0 # ok + process_global_wm = 0 # ok + def __init__(self, queue_name, nst): self.node_type = nst['node_type'] self.node_name = nst['node_name'] @@ -47,6 +49,7 @@ class WorkerState: self.process_tick_event = 1 self.keep_event_ids = 1 self.create_tick = 1 + self.process_global_wm = 1 elif ntype == 'leaf' and not ctype: self.process_batch = 1 self.process_events = 1 @@ -228,6 +231,7 @@ class CascadedWorker(CascadedConsumer): raise Exception("bad event in queue: "+str(ev)) self.log.info("got cascade event: %s" % t) + st = self._worker_state if t == "pgq.location-info": node = ev.ev_data loc = ev.ev_extra2 @@ -235,14 +239,14 @@ class CascadedWorker(CascadedConsumer): q = "select * from pgq_node.register_location(%s, %s, %s, %s)" dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead]) elif t == "pgq.global-watermark": - tick_id = int(ev.ev_data) - q = "select * from pgq_node.set_global_watermark(%s, %s)" - dst_curs.execute(q, [self.pgq_queue_name, tick_id]) + if st.process_global_wm: + tick_id = int(ev.ev_data) + q = "select * from pgq_node.set_global_watermark(%s, %s)" + dst_curs.execute(q, [self.pgq_queue_name, tick_id]) elif t == "pgq.tick-id": tick_id = int(ev.ev_data) if ev.ev_extra1 == self.pgq_queue_name: raise Exception('tick-id event for own queue?') - st = self._worker_state if st.process_tick_event: q = "select * from pgq_node.set_partition_watermark(%s, %s, %s)" dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id]) |