diff options
| author | Egon Valdmees | 2011-07-29 10:08:20 +0000 |
|---|---|---|
| committer | Egon Valdmees | 2011-08-07 18:56:05 +0000 |
| commit | 84ac395220d3dadd7c192d00353a218078ce9d59 (patch) | |
| tree | b8c48106d70fe223c99986af48cff3bc4ea381bd /python | |
| parent | d78be5cd973bf9a6ede4c8ace3deeee8df72145d (diff) | |
cascaded unregister-location
Instead of running drop_node function on every node in cascade,
unregister-location event added to queue by master
Diffstat (limited to 'python')
| -rw-r--r-- | python/pgq/cascade/admin.py | 22 | ||||
| -rw-r--r-- | python/pgq/cascade/worker.py | 4 |
2 files changed, 20 insertions, 6 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 75fea6d6..f5693bc7 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -145,7 +145,7 @@ class CascadeAdmin(skytools.AdminScript): if info['node_type'] is not None: self.log.info("Node is already initialized as %s" % info['node_type']) return - + self.log.info("Initializing node") worker_name = self.options.worker @@ -286,7 +286,7 @@ class CascadeAdmin(skytools.AdminScript): state = self.get_node_info(node) consumer = state.worker_name return (node, consumer) - + # global consumer search if self.find_consumer_check(self.local_node, consumer): return (self.local_node, consumer) @@ -298,7 +298,7 @@ class CascadeAdmin(skytools.AdminScript): continue if self.find_consumer_check(node, consumer): return (node, consumer) - + raise Exception('Consumer not found') def install_code(self, db): @@ -478,9 +478,19 @@ class CascadeAdmin(skytools.AdminScript): raise Exception('node still has subscribers') # remove the node from all the databases - for n in self.queue_info.member_map.values(): - q = "select * from pgq_node.drop_node(%s, %s)" - self.node_cmd(n.name, q, [self.queue_name, node_name]) + # for n in self.queue_info.member_map.values(): + # q = "select * from pgq_node.drop_node(%s, %s)" + # self.node_cmd(n.name, q, [self.queue_name, node_name]) + + # drop node info + db = self.get_node_database(node_name) + q = "select * from pgq_node.drop_node(%s, %s)" + self.exec_cmd(db, q, [self.queue_name, node_name]) + + # unregister node location from root node (event will be added to queue) + root_db = self.find_root_db() + q = "select * from pgq_node.unregister_location(%s, %s)" + self.exec_cmd(root_db, q, [self.queue_name, node_name]) def node_depends(self, sub_node, top_node): cur_node = sub_node diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index 68f9e388..1d3f8325 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -240,6 +240,10 @@ class CascadedWorker(CascadedConsumer): dead = ev.ev_extra3 q = "select * from pgq_node.register_location(%s, %s, %s, %s)" dst_curs.execute(q, [self.pgq_queue_name, node, loc, dead]) + elif t == "pgq.unregister-location": + node = ev.ev_data + q = "select * from pgq_node.unregister_location(%s, %s)" + dst_curs.execute(q, [self.pgq_queue_name, node]) elif t == "pgq.global-watermark": if st.process_global_wm: tick_id = int(ev.ev_data) |
