diff options
author | Marko Kreen | 2009-03-11 13:53:05 +0000 |
---|---|---|
committer | Marko Kreen | 2009-03-12 11:54:23 +0000 |
commit | dd21f3c488319a25361efe570e9a72ef206d1f3c (patch) | |
tree | 7fd66d034fcfebcee1303f8100dff27bd3930657 /python/newadm.py | |
parent | 76669ea5e38a633a6632c2f7c2c2cc51eb127361 (diff) |
newadm: more commands work now
- register
- unregister
- create queue
- drop queue
- show batch
- show queue
Diffstat (limited to 'python/newadm.py')
-rwxr-xr-x | python/newadm.py | 329 |
1 files changed, 238 insertions, 91 deletions
diff --git a/python/newadm.py b/python/newadm.py index 00af5bf6..54fa1fef 100755 --- a/python/newadm.py +++ b/python/newadm.py @@ -9,32 +9,29 @@ connect queue=.. node=..; install pgq; install londiste; ---------------------- -show all queues | consumers; - -show queue ..; -show consumer ..; -show batch events <>; -show batch info <>; +create queue <qname>; +drop queue <qname>; -show_queue_info <q>; -show_queue_stats <q>; -show_consumer_batch <cons>; -show_batch_info <bid>; -show_batch_events <bid> [ev_id]; +show queue *; ---------------------- -alter queue <qname> set param = , ...; +// following cmds expect default queue -create queue <qname>; // db register consumer foo; -unregister consumer foo -drop queue <q>; +unregister consumer foo; + +show queue <qname>; +show batch <batch_id>; +show batch <consumer>; + +// only syntax + +alter queue <qname> set param = , ...; ------------- +--------------------- +show consumers; +show_queue_stats <q>; create node <foo>; // create node <qname>.<foo>; // - add location <node> <loc>; // db, queue ---------------- @@ -82,8 +79,39 @@ def unquote_any(self, s): elif c == '"': s = skytools.unquote_ident(c) # extquote? + else: + s = s.lower() return s +def display_result(curs, desc, fields = []): + """Display multirow query as a table.""" + + rows = curs.fetchall() + + if not fields: + fields = [f[0] for f in curs.description] + + widths = [15] * len(fields) + for i, f in enumerate(fields): + rlen = len(f) + widths[i] = widths[i] > rlen and widths[i] or rlen + for row in rows: + for i, k in enumerate(fields): + rlen = row[k] and len(row) or 0 + widths[i] = widths[i] > rlen and widths[i] or rlen + widths = [w + 2 for w in widths] + + fmt = '%%-%ds' * (len(widths) - 1) + '%%s' + fmt = fmt % tuple(widths[:-1]) + if desc: + print(desc) + print(fmt % tuple(fields)) + print(fmt % tuple(['-'*15] * len(fields))) + + for row in rows: + print(fmt % tuple([row[k] for k in fields])) + print('\n') + class Node: def __init__(self, **kwargs): self.name = kwargs.get('name') @@ -92,6 +120,10 @@ class Node: def get_completions(self, params): return [] +## +## Token classes +## + class Proxy(Node): def set_real(self, node): self.get_next = node.get_next @@ -101,49 +133,50 @@ class WList(Node): c_append = ' ' def __init__(self, *args, **kwargs): Node.__init__(self, **kwargs) - self.wlist = args - - def get_wlist(self): - return self.wlist + self.tok_list = args def get_next(self, typ, word, params): cw = word.lower() - for w in self.get_wlist(): - if w.word == cw: - if self.name: - params[self.name] = cw - return w.next + for w in self.tok_list: + n = w.get_next(typ, word, params) + if n: + if self.name: # and not self.name in params: + params[self.name] = word + return n return None def get_completions(self, params): - wlist = self.get_wlist() comp_list = [] - for w in wlist: + for w in self.tok_list: comp_list += w.get_completions(params) return comp_list -class DynList(Node): - tk_type = ('ident',) +class Word(Node): + tk_type = ("ident",) c_append = ' ' - def __init__(self, next, **kwargs): + def __init__(self, word, next, **kwargs): Node.__init__(self, **kwargs) + self.word = word self.next = next - def get_wlist(self): - return [] - + return [self.word] def get_next(self, typ, word, params): if typ not in self.tk_type: return None - if self.name: + if self.word and word != self.word: + return None + if self.name: # and not self.name in params: params[self.name] = word return self.next - def get_completions(self, params): wlist = self.get_wlist() comp_list = [w + self.c_append for w in wlist] return comp_list +class DynList(Word): + def __init__(self, next, **kwargs): + Word.__init__(self, None, next, **kwargs) + class Queue(DynList): def get_wlist(self): return script.get_queue_list() @@ -164,25 +197,20 @@ class User(DynList): def get_wlist(self): return script.get_user_list() +class Consumer(DynList): + def get_wlist(self): + return script.get_consumer_list() + +class BatchId(DynList): + tk_type = ("num",) + def get_wlist(self): + return script.get_batch_list() + class Port(DynList): tk_type = ("num",) def get_wlist(self): return ['5432', '6432'] -class Word(Node): - tk_type = ("ident",) - c_append = ' ' - def __init__(self, word, next, **kwargs): - Node.__init__(self, **kwargs) - self.word = word - self.next = next - def get_next(self, typ, word, params): - if typ in self.tk_type and word == self.word: - return self.next - return None - def get_completions(self, params): - return [self.word + self.c_append] - class SWord(Word): c_append = '=' @@ -194,18 +222,9 @@ class EQ(Symbol): def __init__(self, next): Symbol.__init__(self, '=', next) -class Value(Node): +class Value(DynList): tk_type = ("str", "num", "ident") - def __init__(self, next, **kwargs): - Node.__init__(self, **kwargs) - self.next = next - def get_next(self, typ, word, params): - if typ not in self.tk_type: - return None - if self.name: - params[self.name] = word - return self.next - def get_completions(self, params): + def get_wlist(self): return [] ## @@ -230,30 +249,58 @@ w_connect.set_real( SWord('node', EQ(DBNode(w_connect, name = 'node'))), w_done)) -w_set = WList( - SWord('queue', EQ(Queue(w_done, name = 'value'))), - SWord('consumer', EQ(Value(w_done, name = 'value'))), - name = "param") +w_show_batch = WList( + BatchId(w_done, name = 'batch_id'), + Consumer(w_done, name = 'consumer')) + +w_show_queue = WList( + Symbol('*', w_done, name = 'queue'), + Queue(w_done, name = 'queue'), + w_done) w_show = WList( - Word('queues', w_done), - Word('databases', w_done), - Word('consumers', w_done), - Word('stats', w_done), - name = "show") + Word('batch', w_show_batch), + Word('queue', w_show_queue), + name = "cmd2") w_install = WList( Word('pgq', w_done), Word('londiste', w_done), name = 'module') -w_create = Word('queue', Value(w_done, name = 'queue')) +w_qargs2 = Proxy() + +w_qargs = WList( + SWord('idle_period', EQ(Value(w_qargs2, name = 'idle_period'))), + SWord('max_count', EQ(Value(w_qargs2, name = 'max_count'))), + SWord('max_lag', EQ(Value(w_qargs2, name = 'max_lag')))) + +w_qargs2.set_real(WList( + w_done, + Symbol(',', w_qargs))) + +w_alter_q = Queue(Word('set', w_qargs), name = 'queue') + +w_alter = Word('queue', w_alter_q, name = 'cmd2') + +w_create = Word('queue', Queue(w_done, name = 'queue'), + name = 'cmd2') + +w_drop = Word('queue', Queue(w_done, name = 'queue'), + name = 'cmd2') + +w_cons_name = Word('consumer', + Consumer(w_done, name = 'consumer'), + name = 'cmd2') w_top = WList( + Word('alter', w_alter), Word('connect', w_connect), Word('create', w_create), + Word('drop', w_drop), Word('install', w_install), - Word('set', w_set), + Word('register', w_cons_name), + Word('unregister', w_cons_name), Word('show', w_show), name = "cmd") @@ -302,6 +349,14 @@ class AdminConsole: q = "select distinct node_name from pgq_node.node_location order by 1" return self._ccache('node_list', q, 'pgq_node') + def get_batch_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = "select current_batch::text from pgq.get_consumer_info(%s)"\ + " where current_batch is not null order by 1" % qname + return self._ccache('batch_list', q, 'pgq') + def _ccache(self, cname, q, req_schema = None): if not self.db: return [] @@ -350,7 +405,7 @@ class AdminConsole: return clist def parse_cmdline(self, argv): - switches = "c:h:p:d:U:f:" + switches = "c:h:p:d:U:f:Q:" lswitches = ['help', 'version'] try: opts, args = getopt.getopt(argv, switches, lswitches) @@ -382,6 +437,8 @@ class AdminConsole: cstr_map['dbname'] = a elif o == "-U": cstr_map['user'] = a + elif o == "-Q": + self.cur_queue = a elif o == "-c": self.cmd_str = a elif o == "-f": @@ -494,21 +551,6 @@ class AdminConsole: def reset_comp_cache(self): self.comp_cache = {} - def find_suggestions_real(self, pfx, params): - # find level - node = top_level - for typ, w in self.sql_words(pfx): - w = w.lower() - node = node.get_next(typ, w, params) - if not node: - break - - # find possible matches - if node: - return node.get_completions(params) - else: - return [] - def find_suggestions(self, pfx, curword, params = {}): c_pfx = self.comp_cache.get('comp_pfx') c_list = self.comp_cache.get('comp_list', []) @@ -524,6 +566,21 @@ class AdminConsole: res.append(cword) return res + def find_suggestions_real(self, pfx, params): + # find level + node = top_level + for typ, w in self.sql_words(pfx): + w = w.lower() + node = node.get_next(typ, w, params) + if not node: + break + + # find possible matches + if node: + return node.get_completions(params) + else: + return [] + def exec_string(self, ln, eof = False): node = top_level params = {} @@ -547,13 +604,21 @@ class AdminConsole: print "multi-line commands not supported:", repr(ln) def exec_params(self, params): + print 'RUN', params cmd = params.get('cmd') + cmd2 = params.get('cmd2') if not cmd: print 'parse error: no command found' return + if cmd2: + cmd = "%s_%s" % (cmd, cmd2) #print 'RUN', repr(params) fn = getattr(self, 'cmd_' + cmd, self.bad_cmd) - fn(params) + try: + fn(params) + print 'OK' + except Exception, ex: + print str(ex) def bad_cmd(self, params): print 'unimplemented command' @@ -623,6 +688,88 @@ class AdminConsole: skytools.db_install(curs, objs, None) print "%s installed" % mod_name + def cmd_show_queue(self, params): + queue = params.get('queue') + if queue is None: + queue = self.cur_queue + if not queue: + print 'No default queue' + return + curs = self.db.cursor() + fields = [ + "queue_name", + "queue_cur_table || '/' || queue_ntables as tables", + "queue_ticker_max_count as max_cnt", + "queue_ticker_max_lag as max_lag", + "queue_ticker_idle_period as idle_period", + "ticker_lag", + ] + pfx = "select " + ",".join(fields) + + if queue == '*': + q = pfx + " from pgq.get_queue_info()" + curs.execute(q) + else: + q = pfx + " from pgq.get_queue_info(%s)" + curs.execute(q, [queue]) + + display_result(curs, 'Queues') + + def cmd_show_batch(self, params): + batch_id = params.get('batch_id') + consumer = params.get('consumer') + queue = self.cur_queue + if not queue: + print 'No default queue' + return + curs = self.db.cursor() + if consumer: + q = "select current_batch from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + res = curs.fetchall() + if len != 1: + print 'no such consumer' + return + batch_id = res[0]['current_batch'] + if batch_id is None: + print 'consumer has no open batch' + return + + q = "select * from pgq.get_batch_events(%s)" + curs.execute(q, [batch_id]) + + display_result(curs, 'Batch events') + + def cmd_register_consumer(self, params): + queue = self.cur_queue + if not queue: + print 'No default queue' + return + consumer = params['consumer'] + curs = self.db.cursor() + q = "select * from pgq.register_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) + + def cmd_unregister_consumer(self, params): + queue = self.cur_queue + if not queue: + print 'No default queue' + return + consumer = params['consumer'] + curs = self.db.cursor() + q = "select * from pgq.unregister_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) + + def cmd_create_queue(self, params): + curs = self.db.cursor() + q = "select * from pgq.create_queue(%(queue)s)" + curs.execute(q, params) + + def cmd_drop_queue(self, params): + curs = self.db.cursor() + q = "select * from pgq.drop_queue(%(queue)s)" + curs.execute(q, params) + def main(): global script |