diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/setup.py | 1 | ||||
-rw-r--r-- | python/pgq/cascade/admin.py | 86 | ||||
-rw-r--r-- | python/pgq/cascade/nodeinfo.py | 13 |
3 files changed, 71 insertions, 29 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py index d7463de7..8a64cc8d 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -619,6 +619,7 @@ class LondisteSetup(CascadeAdmin): def load_extra_status(self, curs, node): """Fetch extra info.""" + # must be thread-safe (!) CascadeAdmin.load_extra_status(self, curs, node) curs.execute("select * from londiste.get_table_list(%s)", [self.queue_name]) n_ok = n_half = n_ign = 0 diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 58fa16e3..ed44dad7 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -10,8 +10,14 @@ setadm.py INI pause NODE [CONS] """ -import sys, time, optparse, skytools, os.path - +import optparse +import os.path +import Queue +import sys +import threading +import time + +import skytools from skytools import UsageError, DBError from pgq.cascade.nodeinfo import * @@ -350,32 +356,63 @@ class CascadeAdmin(skytools.AdminScript): """Show set status.""" self.load_local_info() - for mname, minf in self.queue_info.member_map.iteritems(): - #inf = self.get_node_info(mname) - #self.queue_info.add_node(inf) - #continue - - if not self.node_alive(mname): - node = NodeInfo(self.queue_name, None, node_name = mname) - self.queue_info.add_node(node) - continue + # prepare structs for workers + members = Queue.Queue() + for m in self.queue_info.member_map.itervalues(): + members.put(m) + nodes = Queue.Queue() + + # launch workers and wait + n = max (min (members.qsize() >> 2, 100), 1) + for i in range(n): + t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes)) + t.daemon = True + t.start() + members.join() + + while True: try: - db = self.get_database('look_db', connstr = minf.location, autocommit = 1) - curs = db.cursor() - curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name]) - node = NodeInfo(self.queue_name, curs.fetchone()) - node.load_status(curs) - self.load_extra_status(curs, node) - self.queue_info.add_node(node) - except DBError, d: - msg = str(d).strip().split('\n', 1)[0] - print('Node %s failure: %s' % (mname, msg)) - node = NodeInfo(self.queue_name, None, node_name = mname) - self.queue_info.add_node(node) - self.close_database('look_db') + node = nodes.get_nowait() + except Queue.Empty: + break + self.queue_info.add_node(node) self.queue_info.print_tree() + def _cmd_status_worker (self, members, nodes): + # members in, nodes out, both thread-safe + while True: + try: + m = members.get_nowait() + except Queue.Empty: + break + node = self.load_node_status (m.name, m.location) + nodes.put(node) + members.task_done() + + def load_node_status (self, name, location): + """ Load node info & status """ + # must be thread-safe (!) + if not self.node_alive(name): + node = NodeInfo(self.queue_name, None, node_name = name) + return node + try: + db = None + db = skytools.connect_database (location) + db.set_isolation_level (skytools.I_AUTOCOMMIT) + curs = db.cursor() + curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name]) + node = NodeInfo(self.queue_name, curs.fetchone()) + node.load_status(curs) + self.load_extra_status(curs, node) + except DBError, d: + msg = str(d).strip().split('\n', 1)[0].strip() + print('Node %r failure: %s' % (name, msg)) + node = NodeInfo(self.queue_name, None, node_name = name) + finally: + if db: db.close() + return node + def cmd_node_status(self): """ Show status of a local node. @@ -399,6 +436,7 @@ class CascadeAdmin(skytools.AdminScript): def load_extra_status(self, curs, node): """Fetch extra info.""" + # must be thread-safe (!) pass # diff --git a/python/pgq/cascade/nodeinfo.py b/python/pgq/cascade/nodeinfo.py index 726b311e..60f07583 100644 --- a/python/pgq/cascade/nodeinfo.py +++ b/python/pgq/cascade/nodeinfo.py @@ -150,7 +150,7 @@ class NodeInfo: err = err[:pos] lst.append("ERR: %s: %s" % (cname, err)) return lst - + def add_info_line(self, ln): self._info_lines.append(ln) @@ -166,6 +166,7 @@ class NodeInfo: for row in curs.fetchall(): cname = row['consumer_name'] self.consumer_map[cname] = row + q = "select current_timestamp - ticker_lag as tick_time,"\ " ticker_lag, current_timestamp as now "\ "from pgq.get_queue_info(%s)" @@ -180,7 +181,7 @@ class NodeInfo: class QueueInfo: """Info about cascaded queue. - + Slightly broken, as all info is per-node. """ @@ -192,8 +193,11 @@ class QueueInfo: self.add_node(self.local_node) for r in member_rows: - n = MemberInfo(r) - self.member_map[n.name] = n + m = MemberInfo(r) + self._add_member(m) + + def _add_member(self, member): + self.member_map[member.name] = member def get_member(self, name): return self.member_map.get(name) @@ -288,4 +292,3 @@ def _setpfx(pfx, sfx): def _node_key(n): return (n.levels, n.total_childs, n.name) - |