diff options
author | Marko Kreen | 2009-11-24 15:43:33 +0000 |
---|---|---|
committer | Marko Kreen | 2009-11-24 15:43:33 +0000 |
commit | 2fed3df04c7e9dac065976e5e0ee05baab63f6e5 (patch) | |
tree | 166cae677fc470840489cd9f8ff7996f85875693 /python/qadmin.py | |
parent | 2cd1e444a8ca3417ef0d5ca1d5141cde4122f01c (diff) |
qadmin: parser cleanup, make * optional.
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-x | python/qadmin.py | 226 |
1 files changed, 110 insertions, 116 deletions
diff --git a/python/qadmin.py b/python/qadmin.py index 0c8d625b..54abf872 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -3,23 +3,22 @@ """Commands that require only database connection: connect dbname=.. host=.. service=.. queue=..; - connect queue=.. [ node=.. ]; + connect [ queue=.. ] [ node=.. ]; install pgq | londiste; create queue <qname>; alter queue <qname | *> set param = , ...; drop queue <qname>; - show queue <qname | *>; + show queue [ <qname | *> ]; register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ; unregister consumer foo [from <qname>]; - show consumer <cname | *> [on <qname>]; + show consumer [ <cname | *> [on <qname>] ]; - show node <node | *> [on <qname>]; + show node [ <node | *> [on <qname>] ]; Following commands expect default queue: - show queue; show batch <batch_id>; show batch <consumer>; """ @@ -30,6 +29,13 @@ create <root | branch | leaf> node <node> location <loc> [on <qname>]; drop node <node> [on <qname>]; alter node <node> [location=<loc>] show_queue_stats <q>; + +change provider +drop node +status +rename node + + """ __version__ = '0.1' @@ -53,11 +59,10 @@ General options: --version ''' -import sys, os, readline, getopt, re +import sys, os, readline, getopt, re, psycopg2 import pkgloader pkgloader.require('skytools', '3.0') - import skytools script = None @@ -92,7 +97,7 @@ def display_result(curs, desc, fields = []): if not fields: fields = [f[0] for f in curs.description] - widths = [15] * len(fields) + widths = [10] * len(fields) for i, f in enumerate(fields): rlen = len(f) if rlen > widths[i]: @@ -113,29 +118,37 @@ def display_result(curs, desc, fields = []): for row in rows: print(fmt % tuple([row[k] for k in fields])) - print('\n') + print('') + +## +## Base token classes +## -class Node: +class Token: + """Base class for tokens. + + The optional 'param' kwarg will set corresponding key in + 'params' dict to final token value. + """ def __init__(self, **kwargs): self.name = kwargs.get('name') def get_next(self, typ, word, params): + """Return next token if 'word' matches this token.""" return None def get_completions(self, params): + """Return list of all completions possible at this point.""" return [] -## -## Token classes -## - -class Proxy(Node): +class Proxy(Token): + """Forward def for Token, in case it needs to be referenced recursively.""" def set_real(self, node): self.get_next = node.get_next self.get_completions = node.get_completions -class WList(Node): - c_append = ' ' +class List(Token): + """List of Tokens, will be tried sequentially until one matches.""" def __init__(self, *args, **kwargs): - Node.__init__(self, **kwargs) + Token.__init__(self, **kwargs) self.tok_list = args def get_next(self, typ, word, params): @@ -143,7 +156,7 @@ class WList(Node): for w in self.tok_list: n = w.get_next(typ, word, params) if n: - if self.name: # and not self.name in params: + if self.name: params[self.name] = word return n return None @@ -154,11 +167,18 @@ class WList(Node): comp_list += w.get_completions(params) return comp_list -class Word(Node): +## +## Dynamic token classes +## + +class Word(Token): + """Single fixed word.""" + # token types to accept tk_type = ("ident",) + # string to append to completions c_append = ' ' def __init__(self, word, next, **kwargs): - Node.__init__(self, **kwargs) + Token.__init__(self, **kwargs) self.word = word self.next = next def get_wlist(self): @@ -177,14 +197,18 @@ class Word(Node): return comp_list class DynList(Word): + """Dynamically generated list of words.""" def __init__(self, next, **kwargs): Word.__init__(self, None, next, **kwargs) + def get_wlist(self): + return [] class DynIdentList(DynList): + """Accept quoted words.""" def get_next(self, typ, word, params): """Allow quoted queue names""" next = DynList.get_next(self, typ, word, params) - if next: + if next and self.name: params[self.name] = unquote_any(word) return next @@ -227,16 +251,16 @@ class Port(DynList): def get_wlist(self): return ['5432', '6432'] -class SWord(Word): - c_append = '=' - class Symbol(Word): tk_type = ("sym",) c_append = '' -class EQ(Symbol): - def __init__(self, next): - Symbol.__init__(self, '=', next) +class WordEQ(Word): + """Word that is followed by '='.""" + c_append = '=' + def __init__(self, word, next, **kwargs): + next = Symbol('=', next) + Word.__init__(self, word, next, **kwargs) class Value(DynList): tk_type = ("str", "num", "ident") @@ -255,43 +279,43 @@ eq_val = Symbol('=', Value(w_done, name = 'value')) w_connect = Proxy() w_connect.set_real( - WList( - SWord('dbname', EQ(Database(w_connect, name = 'dbname'))), - SWord('host', EQ(Host(w_connect, name = 'host'))), - SWord('port', EQ(Port(w_connect, name = 'port'))), - SWord('user', EQ(User(w_connect, name = 'user'))), - SWord('password', EQ(Value(w_connect, name = 'password'))), - SWord('queue', EQ(Queue(w_connect, name = 'queue'))), - SWord('node', EQ(DBNode(w_connect, name = 'node'))), + List( + WordEQ('dbname', Database(w_connect, name = 'dbname')), + WordEQ('host', Host(w_connect, name = 'host')), + WordEQ('port', Port(w_connect, name = 'port')), + WordEQ('user', User(w_connect, name = 'user')), + WordEQ('password', Value(w_connect, name = 'password')), + WordEQ('queue', Queue(w_connect, name = 'queue')), + WordEQ('node', DBNode(w_connect, name = 'node')), w_done)) -w_show_batch = WList( +w_show_batch = List( BatchId(w_done, name = 'batch_id'), Consumer(w_done, name = 'consumer')) -w_show_queue = WList( +w_show_queue = List( Symbol('*', w_done, name = 'queue'), Queue(w_done, name = 'queue'), w_done) -w_show_on_queue = WList( +w_show_on_queue = List( Symbol('*', w_done, name = 'queue'), Queue(w_done, name = 'queue'), ) -w_on_queue = WList(Word('on', w_show_on_queue), w_done) +w_on_queue = List(Word('on', w_show_on_queue), w_done) -w_show_consumer = WList( +w_show_consumer = List( Symbol('*', w_on_queue, name = 'consumer'), Consumer(w_on_queue, name = 'consumer'), - ) + w_done) -w_show_node = WList( +w_show_node = List( Symbol('*', w_on_queue, name = 'node'), DBNode(w_on_queue, name = 'node'), - ) + w_done) -w_show = WList( +w_show = List( Word('batch', w_show_batch), Word('help', w_done), Word('queue', w_show_queue), @@ -299,26 +323,26 @@ w_show = WList( Word('node', w_show_node), name = "cmd2") -w_install = WList( +w_install = List( Word('pgq', w_done), Word('londiste', w_done), name = 'module') w_qargs2 = Proxy() -w_qargs = WList( - SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))), - SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))), - SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))), - SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused')))) +w_qargs = List( + WordEQ('idle_period', Value(w_qargs2, name = 'ticker_idle_period')), + WordEQ('max_count', Value(w_qargs2, name = 'ticker_max_count')), + WordEQ('max_lag', Value(w_qargs2, name = 'ticker_max_lag')), + WordEQ('paused', Value(w_qargs2, name = 'ticker_paused'))) -w_qargs2.set_real(WList( +w_qargs2.set_real(List( w_done, Symbol(',', w_qargs))) w_set_q = Word('set', w_qargs) -w_alter_q = WList( +w_alter_q = List( Symbol('*', w_set_q, name = 'queue'), Queue(w_set_q, name = 'queue')) @@ -331,7 +355,7 @@ w_drop = Word('queue', Queue(w_done, name = 'queue'), name = 'cmd2') w_reg_target = Proxy() -w_reg_target.set_real(WList( +w_reg_target.set_real(List( Word('on', Queue(w_reg_target, name = 'queue')), Word('copy', Consumer(w_reg_target, name = 'copy_reg')), Word('at', TickId(w_reg_target, name = 'at_tick')), @@ -341,16 +365,13 @@ w_cons_on_queue = Word('consumer', Consumer(w_reg_target, name = 'consumer'), name = 'cmd2') -w_from_queue = Word('from', - Queue(w_done, name = 'queue'), - name = 'fromqueue') +w_from_queue = Word('from', Queue(w_done, name = 'queue')) w_cons_from_queue = Word('consumer', - Consumer(WList(w_done, w_from_queue), - name = 'consumer'), + Consumer(List(w_done, w_from_queue), name = 'consumer'), name = 'cmd2') -w_top = WList( +w_top = List( Word('alter', w_alter), Word('connect', w_connect), Word('create', w_create), @@ -387,6 +408,7 @@ class AdminConsole: initial_connstr = None rc_hosts = re.compile('\s+') + def get_queue_list(self): q = "select queue_name from pgq.queue order by 1" return self._ccache('queue_list', q, 'pgq') @@ -458,7 +480,7 @@ class AdminConsole: clist.append(h) clist.sort() self.comp_cache['host_list'] = clist - except: + except IOError: clist = [] return clist @@ -526,6 +548,7 @@ class AdminConsole: curs = db.cursor() curs.execute(q) res = curs.fetchone() + print 'Server version', res[1] self.cur_database = res[0] return db @@ -551,7 +574,11 @@ class AdminConsole: if self.cmd_file: cmd_str = open(self.cmd_file, "r").read() - self.db = self.db_connect(self.initial_connstr) + try: + self.db = self.db_connect(self.initial_connstr) + except psycopg2.Error, d: + print str(d).strip() + sys.exit(1) if cmd_str: self.exec_string(cmd_str) @@ -562,6 +589,7 @@ class AdminConsole: readline.parse_and_bind('tab: complete') readline.set_completer(self.rl_completer_safe) #print 'delims: ', repr(readline.get_completer_delims()) + hist_file = os.path.expanduser("~/.qadmin_history") try: readline.read_history_file(hist_file) @@ -572,7 +600,6 @@ class AdminConsole: while 1: try: ln = self.line_input() - #print 'line:', repr(ln) self.exec_string(ln) except KeyboardInterrupt: print @@ -580,7 +607,11 @@ class AdminConsole: print break self.reset_comp_cache() - readline.write_history_file(hist_file) + + try: + readline.write_history_file(hist_file) + except IOError: + pass def rl_completer(self, curword, state): curline = readline.get_line_buffer() @@ -686,7 +717,7 @@ class AdminConsole: print 'unimplemented command' def cmd_connect(self, params): - qname = params.get('queue') + qname = params.get('queue', self.cur_queue) if 'node' in params and not qname: print 'node= needs a queue also' @@ -731,7 +762,7 @@ class AdminConsole: # set default queue if 'queue' in params: self.cur_queue = qname - + print "CONNECT" def cmd_install(self, params): @@ -775,6 +806,8 @@ class AdminConsole: "queue_ticker_idle_period as idle_period", "queue_ticker_paused as paused", "ticker_lag", + "ev_per_sec", + "ev_new", ] pfx = "select " + ",".join(fields) @@ -785,20 +818,12 @@ class AdminConsole: q = pfx + " from pgq.get_queue_info(%s)" curs.execute(q, [queue]) - display_result(curs, 'Queues') + display_result(curs, 'Queue: %s' % queue) def cmd_show_consumer(self, params): """Show consumer status""" - consumer = params.get('consumer') - queue = params.get('queue') - - if not queue: - # queue not provided, see if we have default. - queue = self.cur_queue - - if not consumer: - print 'Consumer not specified' - return + consumer = params.get('consumer', '*') + queue = params.get('queue', '*') q_queue = (queue != '*' and queue or None) q_consumer = (consumer != '*' and consumer or None) @@ -807,36 +832,29 @@ class AdminConsole: q = "select * from pgq.get_consumer_info(%s, %s)" curs.execute(q, [q_queue, q_consumer]) - display_result(curs, 'Consumers') + display_result(curs, 'Consumer "%s" on queue "%s"' % (consumer, queue)) def cmd_show_node(self, params): """Show node information.""" - # TODO: This should additionally show node roles, lags and hierarchy. + # TODO: This should additionally show node roles, lags and hierarchy. # Similar to londiste "status". - node = params.get('node') - queue = params.get('queue') - - if not queue: - # queue not provided, see if we have default. - queue = self.cur_queue - - if not node: - print 'Node not specified' - return + node = params.get('node', '*') + queue = params.get('queue', '*') q_queue = (queue != '*' and queue or None) q_node = (node != '*' and node or None) curs = self.db.cursor() - q = """select node_name, node_name, node_location, dead, queue_name + q = """select queue_name, node_name, node_location, dead from pgq_node.node_location where node_name = coalesce(%s, node_name) - and queue_name = coalesce(%s, queue_name)""" + and queue_name = coalesce(%s, queue_name) + order by 1,2""" curs.execute(q, [q_node, q_queue]) - display_result(curs, 'Database nodes') + display_result(curs, 'Node "%s" on queue "%s"' % (node, queue)) def cmd_show_batch(self, params): batch_id = params.get('batch_id') @@ -960,33 +978,9 @@ class AdminConsole: def main(): global script - script = AdminConsole() script.run(sys.argv[1:]) -def test(pfx, curword): - global script - params = {} - script = AdminConsole() - sgs = script.find_suggestions(pfx, curword, params) - #print repr(pfx), repr(curword), repr(sgs), repr(params) - -def sgtest(): - global script - script = AdminConsole() - test('', '') - test('', 'se') - test('', 'cr') - test('set ', '') - test('set ', 'q') - test('set queue = blah;', '') - test('set queue = ', '') - - script.exec_string('create queue foo;') - script.exec_string('create queue "foo";') - script.exec_string('create queue \'foo\';') - if __name__ == '__main__': - #sgtest() main() |