diff options
author | Marko Kreen | 2010-12-06 12:47:00 +0000 |
---|---|---|
committer | Marko Kreen | 2010-12-06 12:47:00 +0000 |
commit | 0fdf2efacceaa7db8bb3b238ff25af61bb666544 (patch) | |
tree | b7b1bef4a965d2977bf5bbd822ca149306578180 /python/qadmin.py | |
parent | 1a5e15dac62c758d2b830994389417e7bdabaf86 (diff) |
qadmin: sql passthrough, show table/sequence, cleanups
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-x | python/qadmin.py | 296 |
1 files changed, 203 insertions, 93 deletions
diff --git a/python/qadmin.py b/python/qadmin.py index 9a705c15..6be44d0c 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -10,6 +10,8 @@ alter queue <qname | *> set param = , ...; drop queue <qname>; show queue [ <qname | *> ]; + show table <tbl>; + show sequence <seq>; register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ; unregister consumer foo [from <qname>]; @@ -23,18 +25,22 @@ Following commands expect default queue: Londiste commands: londiste add table <tbl> [ , ... ] - with skip_truncate, tgflags=urlenc, - expect_sync, skip_trigger; + with skip_truncate, tgflags='UIDBAQL', + expect_sync, no_triggers, + -- pass trigger args: + backup, skip, when='EXPR', ev_XX='EXPR'; londiste add sequence <seq>; - londiste remove table <tbl>; - londiste remove sequence <seq>; + londiste remove table <tbl> [ , ... ]; + londiste remove sequence <seq> [ , ... ]; londiste tables; londiste seqs; londiste missing; Other commands: - exit; (or press ^D) + exit; - quit program + ^D - quit program + ^C - clear current buffer """ # unimplemented: @@ -108,11 +114,17 @@ IGNORE_HOSTS = { 'ip6-mcastprefix': 1, } -def normalize_any(typ, s): - if typ == 'str': +def unquote_any(typ, s): + if typ == 'ident': + ps = [skytools.unquote_ident(p) for p in s.split('.')] + s = '.'.join(ps) + elif typ == 'str' or typ == 'dolq': s = skytools.unquote_literal(s, stdstr = True) - elif typ == 'ident': - s = skytools.unquote_fqident(s) + return s + +def normalize_any(typ, s): + if typ == 'ident' and s.find('"') < 0: + s = s.lower() return s def display_result(curs, desc, fields = []): @@ -160,8 +172,8 @@ class Token: c_append = ' ' # token type to accept - tk_type = ("ident", "qident", "dolq", "str", "num") - # skipped: numarg, pyold, pynew, sym + tk_type = ("ident", "dolq", "str", "num", "sym") + # skipped: numarg, pyold, pynew def __init__(self, next = None, name = None, append = 0): self.next = next @@ -174,7 +186,7 @@ class Token: """Return next token if 'word' matches this token.""" if not self.is_acceptable(typ, word): return None - self.set_param(word, params) + self.set_param(typ, word, params) return self.next def get_completions(self, params): @@ -189,37 +201,49 @@ class Token: """Return list of potential words at this point.""" return [] - def set_param(self, word, params): + def set_param(self, typ, word, params): + # now set special param if not self.name: return + uw = unquote_any(typ, word) if self._append: lst = params.setdefault(self.name, []) - lst.append(word) + lst.append(uw) else: - params[self.name] = word + params[self.name] = uw def is_acceptable(self, tok, word): if tok not in self.tk_type: return False return True -class Proxy(Token): - """Forward def for Token, in case it needs to be referenced recursively.""" - def set_real(self, node): - self.get_next = node.get_next - self.get_completions = node.get_completions +class Exact(Token): + """Single fixed token.""" + def __init__(self, value, next, **kwargs): + Token.__init__(self, next, **kwargs) + self.value = value + def get_wlist(self): + return [self.value] + def is_acceptable(self, typ, word): + if not Token.is_acceptable(self, typ, word): + return False + return word == self.value class List(Token): """List of Tokens, will be tried sequentially until one matches.""" def __init__(self, *args, **kwargs): Token.__init__(self, **kwargs) - self.tok_list = args + self.tok_list = list(args) + + def add(self, *args): + for a in args: + self.tok_list.append(a) def get_next(self, typ, word, params): for w in self.tok_list: n = w.get_next(typ, word, params) if n: - self.set_param(word, params) + self.set_param(typ, word, params) return n return None @@ -242,68 +266,71 @@ class StrValue(Token): class NumValue(Token): tk_type = ("num",) -class Word(Token): +class Word(Exact): """Single fixed keyword.""" - # token types to accept tk_type = ("ident",) - def __init__(self, word, next, **kwargs): - Token.__init__(self, next, **kwargs) - self.word = word - def get_wlist(self): - return [self.word] - def is_acceptable(self, typ, word): - if not Token.is_acceptable(self, typ, word): - return False - return word == self.word -class Symbol(Word): +class Name(Token): + """Dynamically generated list of idents.""" + tk_type = ("ident") + +class Symbol(Exact): """Single fixed symbol.""" tk_type = ("sym",) c_append = '' -class DynIdent(Token): - """Dynamically generated list of words.""" - tk_type = ("ident", "qident") +class XSymbol(Symbol): + """Symbol that is not shown in completion.""" + def get_wlist(self): + return [] # data-dependant completions -class Queue(DynIdent): +class Queue(Name): def get_wlist(self): return script.get_queue_list() -class Consumer(DynIdent): +class Consumer(Name): def get_wlist(self): return script.get_consumer_list() -class DBNode(DynIdent): +class DBNode(Name): def get_wlist(self): return script.get_node_list() -class Database(DynIdent): +class Database(Name): def get_wlist(self): return script.get_database_list() -class Host(DynIdent): +class Host(Name): def get_wlist(self): return script.get_host_list() -class User(DynIdent): +class User(Name): def get_wlist(self): return script.get_user_list() -class NewTable(DynIdent): +class NewTable(Name): def get_wlist(self): return script.get_new_table_list() -class KnownTable(DynIdent): +class KnownTable(Name): def get_wlist(self): return script.get_known_table_list() -class NewSeq(DynIdent): +class PlainTable(Name): + def get_wlist(self): + return script.get_plain_table_list() + +class PlainSequence(Name): + def get_wlist(self): + return script.get_plain_seq_list() + +class NewSeq(Name): def get_wlist(self): return script.get_new_seq_list() -class KnownSeq(DynIdent): +class KnownSeq(Name): def get_wlist(self): return script.get_known_seq_list() @@ -339,13 +366,16 @@ class WordEQQ(Word): ## Now describe the syntax. ## -top_level = Proxy() +top_level = List(name = 'cmd') w_done = Symbol(';', top_level) +w_xdone = XSymbol(';', top_level) + +w_sql = List(w_done) +w_sql.add(Token(w_sql)) -w_connect = Proxy() -w_connect.set_real( - List( +w_connect = List() +w_connect.add( WordEQ('dbname', Database(w_connect, name = 'dbname')), WordEQ('host', Host(w_connect, name = 'host')), WordEQ('port', Port(w_connect, name = 'port')), @@ -353,7 +383,7 @@ w_connect.set_real( WordEQ('password', ConnstrPassword(w_connect, name = 'password')), WordEQ('queue', Queue(w_connect, name = 'queue')), WordEQ('node', DBNode(w_connect, name = 'node')), - w_done)) + w_done) w_show_batch = List( BatchId(w_done, name = 'batch_id'), @@ -381,12 +411,18 @@ w_show_node = List( DBNode(w_on_queue, name = 'node'), w_done) +w_show_table = PlainTable(w_done, name = 'table') + +w_show_seq = PlainSequence(w_done, name = 'seq') + w_show = List( Word('batch', w_show_batch), Word('help', w_done), Word('queue', w_show_queue), Word('consumer', w_show_consumer), Word('node', w_show_node), + Word('table', w_show_table), + Word('sequence', w_show_seq), name = "cmd2") w_install = List( @@ -394,7 +430,8 @@ w_install = List( Word('londiste', w_done), name = 'module') -w_qargs2 = Proxy() +# alter queue +w_qargs2 = List() w_qargs = List( WordEQQ('idle_period', StrValue(w_qargs2, name = 'ticker_idle_period')), @@ -402,9 +439,8 @@ w_qargs = List( WordEQQ('max_lag', StrValue(w_qargs2, name = 'ticker_max_lag')), WordEQ('paused', NumValue(w_qargs2, name = 'ticker_paused'))) -w_qargs2.set_real(List( - w_done, - Symbol(',', w_qargs))) +w_qargs2.add(w_done) +w_qargs2.add(Symbol(',', w_qargs)) w_set_q = Word('set', w_qargs) @@ -412,32 +448,45 @@ w_alter_q = List( Symbol('*', w_set_q, name = 'queue'), Queue(w_set_q, name = 'queue')) -w_alter = Word('queue', w_alter_q, name = 'cmd2') +# alter +w_alter = List( + Word('queue', w_alter_q), + w_sql, + name = 'cmd2') -w_create = Word('queue', Queue(w_done, name = 'queue'), +# create +w_create = List( + Word('queue', Queue(w_done, name = 'queue')), + w_sql, name = 'cmd2') -w_drop = Word('queue', Queue(w_done, name = 'queue'), +# drop +w_drop = List( + Word('queue', Queue(w_done, name = 'queue')), + w_sql, name = 'cmd2') -w_reg_target = Proxy() -w_reg_target.set_real(List( +# register +w_reg_target = List() +w_reg_target.add( Word('on', Queue(w_reg_target, name = 'queue')), Word('copy', Consumer(w_reg_target, name = 'copy_reg')), Word('at', TickId(w_reg_target, name = 'at_tick')), - w_done)) + w_done) w_cons_on_queue = Word('consumer', Consumer(w_reg_target, name = 'consumer'), name = 'cmd2') +# unregister w_from_queue = Word('from', Queue(w_done, name = 'queue')) w_cons_from_queue = Word('consumer', Consumer(List(w_done, w_from_queue), name = 'consumer'), name = 'cmd2') -w_table_with2 = Proxy() +# londiste add table +w_table_with2 = List() w_table_with = List( Word('skip_truncate', w_table_with2, name = 'skip_truncate'), Word('expect_sync', w_table_with2, name = 'expect_sync'), @@ -456,52 +505,62 @@ w_table_with = List( WordEQQ('tgflags', StrValue(w_table_with2, name = 'tgflags')) ) -w_table_with2.set_real(List( - w_done, - Symbol(',', w_table_with))) +w_table_with2.add(w_done) +w_table_with2.add(Symbol(',', w_table_with)) -w_londiste_add_table = Proxy() +w_londiste_add_table = List() w_londiste_add_table2 = List( Symbol(',', w_londiste_add_table), Word('with', w_table_with), w_done) -w_londiste_add_table.set_real( +w_londiste_add_table.add( NewTable(w_londiste_add_table2, name = 'tables', append = 1)) -w_londiste_add_seq = Proxy() +# londiste add seq +w_londiste_add_seq = List() w_londiste_add_seq2 = List( Symbol(',', w_londiste_add_seq), w_done) -w_londiste_add_seq.set_real( +w_londiste_add_seq.add( NewSeq(w_londiste_add_seq2, name = 'seqs', append = 1)) -w_londiste_remove_table = Proxy() +# londiste remove table +w_londiste_remove_table = List() w_londiste_remove_table2 = List( Symbol(',', w_londiste_remove_table), w_done) -w_londiste_remove_table.set_real( +w_londiste_remove_table.add( KnownTable(w_londiste_remove_table2, name = 'tables', append = 1)) -w_londiste_remove_seq = KnownSeq(w_done, name = 'seqs', append = 1) -w_londiste_remove_seq = Proxy() +# londiste remove sequence +w_londiste_remove_seq = List() w_londiste_remove_seq2 = List( Symbol(',', w_londiste_remove_seq), w_done) -w_londiste_remove_seq.set_real( +w_londiste_remove_seq.add( KnownSeq(w_londiste_remove_seq2, name = 'seqs', append = 1)) +w_londiste_add = List( + Word('table', w_londiste_add_table), + Word('sequence', w_londiste_add_seq), + name = 'cmd3') + +w_londiste_remove = List( + Word('table', w_londiste_remove_table), + Word('sequence', w_londiste_remove_seq), + name = 'cmd3') + +# londiste w_londiste = List( - Word('add_table', w_londiste_add_table), - Word('add_seq', w_londiste_add_seq), - Word('remove_table', w_londiste_remove_table), - Word('remove_seq', w_londiste_remove_seq), + Word('add', w_londiste_add), + Word('remove', w_londiste_remove), Word('missing', w_done), Word('tables', w_done), Word('seqs', w_done), name = "cmd2") -w_top = List( +top_level.add( Word('alter', w_alter), Word('connect', w_connect), Word('create', w_create), @@ -512,9 +571,9 @@ w_top = List( Word('show', w_show), Word('exit', w_done), Word('londiste', w_londiste), - name = "cmd") -top_level.set_real(w_top) + Word('select', w_sql), + w_sql) ## ## Main class for keeping the state. @@ -614,6 +673,23 @@ class AdminConsole: " where queue_name = %s order by 1" % qname return self._ccache('known_seq_list', q, 'londiste') + def get_plain_table_list(self): + q = "select quote_ident(n.nspname) || '.' || quote_ident(r.relname)"\ + " from pg_class r join pg_namespace n on (n.oid = r.relnamespace)"\ + " where r.relkind = 'r' "\ + " and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') "\ + " and n.nspname !~ 'pg_.*' "\ + " order by 1" + return self._ccache('plain_table_list', q) + + def get_plain_seq_list(self): + q = "select quote_ident(n.nspname) || '.' || quote_ident(r.relname)"\ + " from pg_class r join pg_namespace n on (n.oid = r.relnamespace)"\ + " where r.relkind = 'S' "\ + " and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') "\ + " order by 1" + return self._ccache('plain_seq_list', q) + def get_batch_list(self): if not self.cur_queue: return [] @@ -793,6 +869,8 @@ class AdminConsole: except EOFError: print break + except psycopg2.Error, d: + print 'ERROR:', str(d).strip() self.reset_comp_cache() try: @@ -829,7 +907,6 @@ class AdminConsole: standard_quoting = True, fqident = True, show_location = True, - use_qident = True, ignore_whitespace = True) def reset_comp_cache(self): @@ -891,9 +968,10 @@ class AdminConsole: def exec_string(self, ln, eof = False): node = top_level params = {} + self.tokens = [] for typ, w, pos in self.sql_words(ln): + self.tokens.append((typ, w)) w = normalize_any(typ, w) - #print repr(typ), repr(w) if typ == 'error': print 'syntax error 1:', repr(ln) return @@ -905,6 +983,7 @@ class AdminConsole: if node == top_level: self.exec_params(params) params = {} + self.tokens = [] if eof: if params: self.exec_params(params) @@ -915,21 +994,17 @@ class AdminConsole: #print 'RUN', params cmd = params.get('cmd') cmd2 = params.get('cmd2') + cmd3 = params.get('cmd3') if not cmd: print 'parse error: no command found' return if cmd2: cmd = "%s_%s" % (cmd, cmd2) + if cmd3: + cmd = "%s_%s" % (cmd, cmd3) #print 'RUN', repr(params) - fn = getattr(self, 'cmd_' + cmd, self.bad_cmd) - try: - fn(params) - except Exception, ex: - print "ERROR: %s" % str(ex).strip() - raise - - def bad_cmd(self, params): - print 'unimplemented command' + fn = getattr(self, 'cmd_' + cmd, self.execute_sql) + fn(params) def cmd_connect(self, params): qname = params.get('queue', self.cur_queue) @@ -1285,6 +1360,41 @@ class AdminConsole: print res[0], res[1] print 'REMOVE_SEQ:', res[0], res[1] + ## generic info + + def cmd_show_table(self, params): + print '-' * 64 + tbl = params['table'] + curs = self.db.cursor() + s = skytools.TableStruct(curs, tbl) + s.create(fakecurs(), skytools.T_ALL) + print '-' * 64 + + def cmd_show_sequence(self, params): + print '-' * 64 + seq = params['seq'] + curs = self.db.cursor() + s = skytools.SeqStruct(curs, seq) + s.create(fakecurs(), skytools.T_ALL) + print '-' * 64 + + ## sql pass-through + + def execute_sql(self, params): + tks = [tk[1] for tk in self.tokens] + sql = ' '.join(tks) + + curs = self.db.cursor() + curs.execute(sql) + + if curs.description: + display_result(curs, None) + print curs.statusmessage + +class fakecurs: + def execute(self, sql): + print sql + def main(): global script script = AdminConsole() |