summaryrefslogtreecommitdiff
path: root/python/qadmin.py
diff options
context:
space:
mode:
authorMarko Kreen2010-12-06 12:47:00 +0000
committerMarko Kreen2010-12-06 12:47:00 +0000
commit0fdf2efacceaa7db8bb3b238ff25af61bb666544 (patch)
treeb7b1bef4a965d2977bf5bbd822ca149306578180 /python/qadmin.py
parent1a5e15dac62c758d2b830994389417e7bdabaf86 (diff)
qadmin: sql passthrough, show table/sequence, cleanups
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-xpython/qadmin.py296
1 files changed, 203 insertions, 93 deletions
diff --git a/python/qadmin.py b/python/qadmin.py
index 9a705c15..6be44d0c 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -10,6 +10,8 @@
alter queue <qname | *> set param = , ...;
drop queue <qname>;
show queue [ <qname | *> ];
+ show table <tbl>;
+ show sequence <seq>;
register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ;
unregister consumer foo [from <qname>];
@@ -23,18 +25,22 @@ Following commands expect default queue:
Londiste commands:
londiste add table <tbl> [ , ... ]
- with skip_truncate, tgflags=urlenc,
- expect_sync, skip_trigger;
+ with skip_truncate, tgflags='UIDBAQL',
+ expect_sync, no_triggers,
+ -- pass trigger args:
+ backup, skip, when='EXPR', ev_XX='EXPR';
londiste add sequence <seq>;
- londiste remove table <tbl>;
- londiste remove sequence <seq>;
+ londiste remove table <tbl> [ , ... ];
+ londiste remove sequence <seq> [ , ... ];
londiste tables;
londiste seqs;
londiste missing;
Other commands:
- exit; (or press ^D)
+ exit; - quit program
+ ^D - quit program
+ ^C - clear current buffer
"""
# unimplemented:
@@ -108,11 +114,17 @@ IGNORE_HOSTS = {
'ip6-mcastprefix': 1,
}
-def normalize_any(typ, s):
- if typ == 'str':
+def unquote_any(typ, s):
+ if typ == 'ident':
+ ps = [skytools.unquote_ident(p) for p in s.split('.')]
+ s = '.'.join(ps)
+ elif typ == 'str' or typ == 'dolq':
s = skytools.unquote_literal(s, stdstr = True)
- elif typ == 'ident':
- s = skytools.unquote_fqident(s)
+ return s
+
+def normalize_any(typ, s):
+ if typ == 'ident' and s.find('"') < 0:
+ s = s.lower()
return s
def display_result(curs, desc, fields = []):
@@ -160,8 +172,8 @@ class Token:
c_append = ' '
# token type to accept
- tk_type = ("ident", "qident", "dolq", "str", "num")
- # skipped: numarg, pyold, pynew, sym
+ tk_type = ("ident", "dolq", "str", "num", "sym")
+ # skipped: numarg, pyold, pynew
def __init__(self, next = None, name = None, append = 0):
self.next = next
@@ -174,7 +186,7 @@ class Token:
"""Return next token if 'word' matches this token."""
if not self.is_acceptable(typ, word):
return None
- self.set_param(word, params)
+ self.set_param(typ, word, params)
return self.next
def get_completions(self, params):
@@ -189,37 +201,49 @@ class Token:
"""Return list of potential words at this point."""
return []
- def set_param(self, word, params):
+ def set_param(self, typ, word, params):
+ # now set special param
if not self.name:
return
+ uw = unquote_any(typ, word)
if self._append:
lst = params.setdefault(self.name, [])
- lst.append(word)
+ lst.append(uw)
else:
- params[self.name] = word
+ params[self.name] = uw
def is_acceptable(self, tok, word):
if tok not in self.tk_type:
return False
return True
-class Proxy(Token):
- """Forward def for Token, in case it needs to be referenced recursively."""
- def set_real(self, node):
- self.get_next = node.get_next
- self.get_completions = node.get_completions
+class Exact(Token):
+ """Single fixed token."""
+ def __init__(self, value, next, **kwargs):
+ Token.__init__(self, next, **kwargs)
+ self.value = value
+ def get_wlist(self):
+ return [self.value]
+ def is_acceptable(self, typ, word):
+ if not Token.is_acceptable(self, typ, word):
+ return False
+ return word == self.value
class List(Token):
"""List of Tokens, will be tried sequentially until one matches."""
def __init__(self, *args, **kwargs):
Token.__init__(self, **kwargs)
- self.tok_list = args
+ self.tok_list = list(args)
+
+ def add(self, *args):
+ for a in args:
+ self.tok_list.append(a)
def get_next(self, typ, word, params):
for w in self.tok_list:
n = w.get_next(typ, word, params)
if n:
- self.set_param(word, params)
+ self.set_param(typ, word, params)
return n
return None
@@ -242,68 +266,71 @@ class StrValue(Token):
class NumValue(Token):
tk_type = ("num",)
-class Word(Token):
+class Word(Exact):
"""Single fixed keyword."""
- # token types to accept
tk_type = ("ident",)
- def __init__(self, word, next, **kwargs):
- Token.__init__(self, next, **kwargs)
- self.word = word
- def get_wlist(self):
- return [self.word]
- def is_acceptable(self, typ, word):
- if not Token.is_acceptable(self, typ, word):
- return False
- return word == self.word
-class Symbol(Word):
+class Name(Token):
+ """Dynamically generated list of idents."""
+ tk_type = ("ident")
+
+class Symbol(Exact):
"""Single fixed symbol."""
tk_type = ("sym",)
c_append = ''
-class DynIdent(Token):
- """Dynamically generated list of words."""
- tk_type = ("ident", "qident")
+class XSymbol(Symbol):
+ """Symbol that is not shown in completion."""
+ def get_wlist(self):
+ return []
# data-dependant completions
-class Queue(DynIdent):
+class Queue(Name):
def get_wlist(self):
return script.get_queue_list()
-class Consumer(DynIdent):
+class Consumer(Name):
def get_wlist(self):
return script.get_consumer_list()
-class DBNode(DynIdent):
+class DBNode(Name):
def get_wlist(self):
return script.get_node_list()
-class Database(DynIdent):
+class Database(Name):
def get_wlist(self):
return script.get_database_list()
-class Host(DynIdent):
+class Host(Name):
def get_wlist(self):
return script.get_host_list()
-class User(DynIdent):
+class User(Name):
def get_wlist(self):
return script.get_user_list()
-class NewTable(DynIdent):
+class NewTable(Name):
def get_wlist(self):
return script.get_new_table_list()
-class KnownTable(DynIdent):
+class KnownTable(Name):
def get_wlist(self):
return script.get_known_table_list()
-class NewSeq(DynIdent):
+class PlainTable(Name):
+ def get_wlist(self):
+ return script.get_plain_table_list()
+
+class PlainSequence(Name):
+ def get_wlist(self):
+ return script.get_plain_seq_list()
+
+class NewSeq(Name):
def get_wlist(self):
return script.get_new_seq_list()
-class KnownSeq(DynIdent):
+class KnownSeq(Name):
def get_wlist(self):
return script.get_known_seq_list()
@@ -339,13 +366,16 @@ class WordEQQ(Word):
## Now describe the syntax.
##
-top_level = Proxy()
+top_level = List(name = 'cmd')
w_done = Symbol(';', top_level)
+w_xdone = XSymbol(';', top_level)
+
+w_sql = List(w_done)
+w_sql.add(Token(w_sql))
-w_connect = Proxy()
-w_connect.set_real(
- List(
+w_connect = List()
+w_connect.add(
WordEQ('dbname', Database(w_connect, name = 'dbname')),
WordEQ('host', Host(w_connect, name = 'host')),
WordEQ('port', Port(w_connect, name = 'port')),
@@ -353,7 +383,7 @@ w_connect.set_real(
WordEQ('password', ConnstrPassword(w_connect, name = 'password')),
WordEQ('queue', Queue(w_connect, name = 'queue')),
WordEQ('node', DBNode(w_connect, name = 'node')),
- w_done))
+ w_done)
w_show_batch = List(
BatchId(w_done, name = 'batch_id'),
@@ -381,12 +411,18 @@ w_show_node = List(
DBNode(w_on_queue, name = 'node'),
w_done)
+w_show_table = PlainTable(w_done, name = 'table')
+
+w_show_seq = PlainSequence(w_done, name = 'seq')
+
w_show = List(
Word('batch', w_show_batch),
Word('help', w_done),
Word('queue', w_show_queue),
Word('consumer', w_show_consumer),
Word('node', w_show_node),
+ Word('table', w_show_table),
+ Word('sequence', w_show_seq),
name = "cmd2")
w_install = List(
@@ -394,7 +430,8 @@ w_install = List(
Word('londiste', w_done),
name = 'module')
-w_qargs2 = Proxy()
+# alter queue
+w_qargs2 = List()
w_qargs = List(
WordEQQ('idle_period', StrValue(w_qargs2, name = 'ticker_idle_period')),
@@ -402,9 +439,8 @@ w_qargs = List(
WordEQQ('max_lag', StrValue(w_qargs2, name = 'ticker_max_lag')),
WordEQ('paused', NumValue(w_qargs2, name = 'ticker_paused')))
-w_qargs2.set_real(List(
- w_done,
- Symbol(',', w_qargs)))
+w_qargs2.add(w_done)
+w_qargs2.add(Symbol(',', w_qargs))
w_set_q = Word('set', w_qargs)
@@ -412,32 +448,45 @@ w_alter_q = List(
Symbol('*', w_set_q, name = 'queue'),
Queue(w_set_q, name = 'queue'))
-w_alter = Word('queue', w_alter_q, name = 'cmd2')
+# alter
+w_alter = List(
+ Word('queue', w_alter_q),
+ w_sql,
+ name = 'cmd2')
-w_create = Word('queue', Queue(w_done, name = 'queue'),
+# create
+w_create = List(
+ Word('queue', Queue(w_done, name = 'queue')),
+ w_sql,
name = 'cmd2')
-w_drop = Word('queue', Queue(w_done, name = 'queue'),
+# drop
+w_drop = List(
+ Word('queue', Queue(w_done, name = 'queue')),
+ w_sql,
name = 'cmd2')
-w_reg_target = Proxy()
-w_reg_target.set_real(List(
+# register
+w_reg_target = List()
+w_reg_target.add(
Word('on', Queue(w_reg_target, name = 'queue')),
Word('copy', Consumer(w_reg_target, name = 'copy_reg')),
Word('at', TickId(w_reg_target, name = 'at_tick')),
- w_done))
+ w_done)
w_cons_on_queue = Word('consumer',
Consumer(w_reg_target, name = 'consumer'),
name = 'cmd2')
+# unregister
w_from_queue = Word('from', Queue(w_done, name = 'queue'))
w_cons_from_queue = Word('consumer',
Consumer(List(w_done, w_from_queue), name = 'consumer'),
name = 'cmd2')
-w_table_with2 = Proxy()
+# londiste add table
+w_table_with2 = List()
w_table_with = List(
Word('skip_truncate', w_table_with2, name = 'skip_truncate'),
Word('expect_sync', w_table_with2, name = 'expect_sync'),
@@ -456,52 +505,62 @@ w_table_with = List(
WordEQQ('tgflags', StrValue(w_table_with2, name = 'tgflags'))
)
-w_table_with2.set_real(List(
- w_done,
- Symbol(',', w_table_with)))
+w_table_with2.add(w_done)
+w_table_with2.add(Symbol(',', w_table_with))
-w_londiste_add_table = Proxy()
+w_londiste_add_table = List()
w_londiste_add_table2 = List(
Symbol(',', w_londiste_add_table),
Word('with', w_table_with),
w_done)
-w_londiste_add_table.set_real(
+w_londiste_add_table.add(
NewTable(w_londiste_add_table2,
name = 'tables', append = 1))
-w_londiste_add_seq = Proxy()
+# londiste add seq
+w_londiste_add_seq = List()
w_londiste_add_seq2 = List(
Symbol(',', w_londiste_add_seq),
w_done)
-w_londiste_add_seq.set_real(
+w_londiste_add_seq.add(
NewSeq(w_londiste_add_seq2, name = 'seqs', append = 1))
-w_londiste_remove_table = Proxy()
+# londiste remove table
+w_londiste_remove_table = List()
w_londiste_remove_table2 = List(
Symbol(',', w_londiste_remove_table),
w_done)
-w_londiste_remove_table.set_real(
+w_londiste_remove_table.add(
KnownTable(w_londiste_remove_table2, name = 'tables', append = 1))
-w_londiste_remove_seq = KnownSeq(w_done, name = 'seqs', append = 1)
-w_londiste_remove_seq = Proxy()
+# londiste remove sequence
+w_londiste_remove_seq = List()
w_londiste_remove_seq2 = List(
Symbol(',', w_londiste_remove_seq),
w_done)
-w_londiste_remove_seq.set_real(
+w_londiste_remove_seq.add(
KnownSeq(w_londiste_remove_seq2, name = 'seqs', append = 1))
+w_londiste_add = List(
+ Word('table', w_londiste_add_table),
+ Word('sequence', w_londiste_add_seq),
+ name = 'cmd3')
+
+w_londiste_remove = List(
+ Word('table', w_londiste_remove_table),
+ Word('sequence', w_londiste_remove_seq),
+ name = 'cmd3')
+
+# londiste
w_londiste = List(
- Word('add_table', w_londiste_add_table),
- Word('add_seq', w_londiste_add_seq),
- Word('remove_table', w_londiste_remove_table),
- Word('remove_seq', w_londiste_remove_seq),
+ Word('add', w_londiste_add),
+ Word('remove', w_londiste_remove),
Word('missing', w_done),
Word('tables', w_done),
Word('seqs', w_done),
name = "cmd2")
-w_top = List(
+top_level.add(
Word('alter', w_alter),
Word('connect', w_connect),
Word('create', w_create),
@@ -512,9 +571,9 @@ w_top = List(
Word('show', w_show),
Word('exit', w_done),
Word('londiste', w_londiste),
- name = "cmd")
-top_level.set_real(w_top)
+ Word('select', w_sql),
+ w_sql)
##
## Main class for keeping the state.
@@ -614,6 +673,23 @@ class AdminConsole:
" where queue_name = %s order by 1" % qname
return self._ccache('known_seq_list', q, 'londiste')
+ def get_plain_table_list(self):
+ q = "select quote_ident(n.nspname) || '.' || quote_ident(r.relname)"\
+ " from pg_class r join pg_namespace n on (n.oid = r.relnamespace)"\
+ " where r.relkind = 'r' "\
+ " and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') "\
+ " and n.nspname !~ 'pg_.*' "\
+ " order by 1"
+ return self._ccache('plain_table_list', q)
+
+ def get_plain_seq_list(self):
+ q = "select quote_ident(n.nspname) || '.' || quote_ident(r.relname)"\
+ " from pg_class r join pg_namespace n on (n.oid = r.relnamespace)"\
+ " where r.relkind = 'S' "\
+ " and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') "\
+ " order by 1"
+ return self._ccache('plain_seq_list', q)
+
def get_batch_list(self):
if not self.cur_queue:
return []
@@ -793,6 +869,8 @@ class AdminConsole:
except EOFError:
print
break
+ except psycopg2.Error, d:
+ print 'ERROR:', str(d).strip()
self.reset_comp_cache()
try:
@@ -829,7 +907,6 @@ class AdminConsole:
standard_quoting = True,
fqident = True,
show_location = True,
- use_qident = True,
ignore_whitespace = True)
def reset_comp_cache(self):
@@ -891,9 +968,10 @@ class AdminConsole:
def exec_string(self, ln, eof = False):
node = top_level
params = {}
+ self.tokens = []
for typ, w, pos in self.sql_words(ln):
+ self.tokens.append((typ, w))
w = normalize_any(typ, w)
- #print repr(typ), repr(w)
if typ == 'error':
print 'syntax error 1:', repr(ln)
return
@@ -905,6 +983,7 @@ class AdminConsole:
if node == top_level:
self.exec_params(params)
params = {}
+ self.tokens = []
if eof:
if params:
self.exec_params(params)
@@ -915,21 +994,17 @@ class AdminConsole:
#print 'RUN', params
cmd = params.get('cmd')
cmd2 = params.get('cmd2')
+ cmd3 = params.get('cmd3')
if not cmd:
print 'parse error: no command found'
return
if cmd2:
cmd = "%s_%s" % (cmd, cmd2)
+ if cmd3:
+ cmd = "%s_%s" % (cmd, cmd3)
#print 'RUN', repr(params)
- fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
- try:
- fn(params)
- except Exception, ex:
- print "ERROR: %s" % str(ex).strip()
- raise
-
- def bad_cmd(self, params):
- print 'unimplemented command'
+ fn = getattr(self, 'cmd_' + cmd, self.execute_sql)
+ fn(params)
def cmd_connect(self, params):
qname = params.get('queue', self.cur_queue)
@@ -1285,6 +1360,41 @@ class AdminConsole:
print res[0], res[1]
print 'REMOVE_SEQ:', res[0], res[1]
+ ## generic info
+
+ def cmd_show_table(self, params):
+ print '-' * 64
+ tbl = params['table']
+ curs = self.db.cursor()
+ s = skytools.TableStruct(curs, tbl)
+ s.create(fakecurs(), skytools.T_ALL)
+ print '-' * 64
+
+ def cmd_show_sequence(self, params):
+ print '-' * 64
+ seq = params['seq']
+ curs = self.db.cursor()
+ s = skytools.SeqStruct(curs, seq)
+ s.create(fakecurs(), skytools.T_ALL)
+ print '-' * 64
+
+ ## sql pass-through
+
+ def execute_sql(self, params):
+ tks = [tk[1] for tk in self.tokens]
+ sql = ' '.join(tks)
+
+ curs = self.db.cursor()
+ curs.execute(sql)
+
+ if curs.description:
+ display_result(curs, None)
+ print curs.statusmessage
+
+class fakecurs:
+ def execute(self, sql):
+ print sql
+
def main():
global script
script = AdminConsole()