summaryrefslogtreecommitdiff
path: root/python/newadm.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/newadm.py')
-rwxr-xr-xpython/newadm.py329
1 files changed, 238 insertions, 91 deletions
diff --git a/python/newadm.py b/python/newadm.py
index 00af5bf6..54fa1fef 100755
--- a/python/newadm.py
+++ b/python/newadm.py
@@ -9,32 +9,29 @@ connect queue=.. node=..;
install pgq;
install londiste;
----------------------
-show all queues | consumers;
-
-show queue ..;
-show consumer ..;
-show batch events <>;
-show batch info <>;
+create queue <qname>;
+drop queue <qname>;
-show_queue_info <q>;
-show_queue_stats <q>;
-show_consumer_batch <cons>;
-show_batch_info <bid>;
-show_batch_events <bid> [ev_id];
+show queue *;
----------------------
-alter queue <qname> set param = , ...;
+// following cmds expect default queue
-create queue <qname>; // db
register consumer foo;
-unregister consumer foo
-drop queue <q>;
+unregister consumer foo;
+
+show queue <qname>;
+show batch <batch_id>;
+show batch <consumer>;
+
+// only syntax
+
+alter queue <qname> set param = , ...;
-------------
+---------------------
+show consumers;
+show_queue_stats <q>;
create node <foo>; //
create node <qname>.<foo>; //
-
add location <node> <loc>; // db, queue
----------------
@@ -82,8 +79,39 @@ def unquote_any(self, s):
elif c == '"':
s = skytools.unquote_ident(c)
# extquote?
+ else:
+ s = s.lower()
return s
+def display_result(curs, desc, fields = []):
+ """Display multirow query as a table."""
+
+ rows = curs.fetchall()
+
+ if not fields:
+ fields = [f[0] for f in curs.description]
+
+ widths = [15] * len(fields)
+ for i, f in enumerate(fields):
+ rlen = len(f)
+ widths[i] = widths[i] > rlen and widths[i] or rlen
+ for row in rows:
+ for i, k in enumerate(fields):
+ rlen = row[k] and len(row) or 0
+ widths[i] = widths[i] > rlen and widths[i] or rlen
+ widths = [w + 2 for w in widths]
+
+ fmt = '%%-%ds' * (len(widths) - 1) + '%%s'
+ fmt = fmt % tuple(widths[:-1])
+ if desc:
+ print(desc)
+ print(fmt % tuple(fields))
+ print(fmt % tuple(['-'*15] * len(fields)))
+
+ for row in rows:
+ print(fmt % tuple([row[k] for k in fields]))
+ print('\n')
+
class Node:
def __init__(self, **kwargs):
self.name = kwargs.get('name')
@@ -92,6 +120,10 @@ class Node:
def get_completions(self, params):
return []
+##
+## Token classes
+##
+
class Proxy(Node):
def set_real(self, node):
self.get_next = node.get_next
@@ -101,49 +133,50 @@ class WList(Node):
c_append = ' '
def __init__(self, *args, **kwargs):
Node.__init__(self, **kwargs)
- self.wlist = args
-
- def get_wlist(self):
- return self.wlist
+ self.tok_list = args
def get_next(self, typ, word, params):
cw = word.lower()
- for w in self.get_wlist():
- if w.word == cw:
- if self.name:
- params[self.name] = cw
- return w.next
+ for w in self.tok_list:
+ n = w.get_next(typ, word, params)
+ if n:
+ if self.name: # and not self.name in params:
+ params[self.name] = word
+ return n
return None
def get_completions(self, params):
- wlist = self.get_wlist()
comp_list = []
- for w in wlist:
+ for w in self.tok_list:
comp_list += w.get_completions(params)
return comp_list
-class DynList(Node):
- tk_type = ('ident',)
+class Word(Node):
+ tk_type = ("ident",)
c_append = ' '
- def __init__(self, next, **kwargs):
+ def __init__(self, word, next, **kwargs):
Node.__init__(self, **kwargs)
+ self.word = word
self.next = next
-
def get_wlist(self):
- return []
-
+ return [self.word]
def get_next(self, typ, word, params):
if typ not in self.tk_type:
return None
- if self.name:
+ if self.word and word != self.word:
+ return None
+ if self.name: # and not self.name in params:
params[self.name] = word
return self.next
-
def get_completions(self, params):
wlist = self.get_wlist()
comp_list = [w + self.c_append for w in wlist]
return comp_list
+class DynList(Word):
+ def __init__(self, next, **kwargs):
+ Word.__init__(self, None, next, **kwargs)
+
class Queue(DynList):
def get_wlist(self):
return script.get_queue_list()
@@ -164,25 +197,20 @@ class User(DynList):
def get_wlist(self):
return script.get_user_list()
+class Consumer(DynList):
+ def get_wlist(self):
+ return script.get_consumer_list()
+
+class BatchId(DynList):
+ tk_type = ("num",)
+ def get_wlist(self):
+ return script.get_batch_list()
+
class Port(DynList):
tk_type = ("num",)
def get_wlist(self):
return ['5432', '6432']
-class Word(Node):
- tk_type = ("ident",)
- c_append = ' '
- def __init__(self, word, next, **kwargs):
- Node.__init__(self, **kwargs)
- self.word = word
- self.next = next
- def get_next(self, typ, word, params):
- if typ in self.tk_type and word == self.word:
- return self.next
- return None
- def get_completions(self, params):
- return [self.word + self.c_append]
-
class SWord(Word):
c_append = '='
@@ -194,18 +222,9 @@ class EQ(Symbol):
def __init__(self, next):
Symbol.__init__(self, '=', next)
-class Value(Node):
+class Value(DynList):
tk_type = ("str", "num", "ident")
- def __init__(self, next, **kwargs):
- Node.__init__(self, **kwargs)
- self.next = next
- def get_next(self, typ, word, params):
- if typ not in self.tk_type:
- return None
- if self.name:
- params[self.name] = word
- return self.next
- def get_completions(self, params):
+ def get_wlist(self):
return []
##
@@ -230,30 +249,58 @@ w_connect.set_real(
SWord('node', EQ(DBNode(w_connect, name = 'node'))),
w_done))
-w_set = WList(
- SWord('queue', EQ(Queue(w_done, name = 'value'))),
- SWord('consumer', EQ(Value(w_done, name = 'value'))),
- name = "param")
+w_show_batch = WList(
+ BatchId(w_done, name = 'batch_id'),
+ Consumer(w_done, name = 'consumer'))
+
+w_show_queue = WList(
+ Symbol('*', w_done, name = 'queue'),
+ Queue(w_done, name = 'queue'),
+ w_done)
w_show = WList(
- Word('queues', w_done),
- Word('databases', w_done),
- Word('consumers', w_done),
- Word('stats', w_done),
- name = "show")
+ Word('batch', w_show_batch),
+ Word('queue', w_show_queue),
+ name = "cmd2")
w_install = WList(
Word('pgq', w_done),
Word('londiste', w_done),
name = 'module')
-w_create = Word('queue', Value(w_done, name = 'queue'))
+w_qargs2 = Proxy()
+
+w_qargs = WList(
+ SWord('idle_period', EQ(Value(w_qargs2, name = 'idle_period'))),
+ SWord('max_count', EQ(Value(w_qargs2, name = 'max_count'))),
+ SWord('max_lag', EQ(Value(w_qargs2, name = 'max_lag'))))
+
+w_qargs2.set_real(WList(
+ w_done,
+ Symbol(',', w_qargs)))
+
+w_alter_q = Queue(Word('set', w_qargs), name = 'queue')
+
+w_alter = Word('queue', w_alter_q, name = 'cmd2')
+
+w_create = Word('queue', Queue(w_done, name = 'queue'),
+ name = 'cmd2')
+
+w_drop = Word('queue', Queue(w_done, name = 'queue'),
+ name = 'cmd2')
+
+w_cons_name = Word('consumer',
+ Consumer(w_done, name = 'consumer'),
+ name = 'cmd2')
w_top = WList(
+ Word('alter', w_alter),
Word('connect', w_connect),
Word('create', w_create),
+ Word('drop', w_drop),
Word('install', w_install),
- Word('set', w_set),
+ Word('register', w_cons_name),
+ Word('unregister', w_cons_name),
Word('show', w_show),
name = "cmd")
@@ -302,6 +349,14 @@ class AdminConsole:
q = "select distinct node_name from pgq_node.node_location order by 1"
return self._ccache('node_list', q, 'pgq_node')
+ def get_batch_list(self):
+ if not self.cur_queue:
+ return []
+ qname = skytools.quote_literal(self.cur_queue)
+ q = "select current_batch::text from pgq.get_consumer_info(%s)"\
+ " where current_batch is not null order by 1" % qname
+ return self._ccache('batch_list', q, 'pgq')
+
def _ccache(self, cname, q, req_schema = None):
if not self.db:
return []
@@ -350,7 +405,7 @@ class AdminConsole:
return clist
def parse_cmdline(self, argv):
- switches = "c:h:p:d:U:f:"
+ switches = "c:h:p:d:U:f:Q:"
lswitches = ['help', 'version']
try:
opts, args = getopt.getopt(argv, switches, lswitches)
@@ -382,6 +437,8 @@ class AdminConsole:
cstr_map['dbname'] = a
elif o == "-U":
cstr_map['user'] = a
+ elif o == "-Q":
+ self.cur_queue = a
elif o == "-c":
self.cmd_str = a
elif o == "-f":
@@ -494,21 +551,6 @@ class AdminConsole:
def reset_comp_cache(self):
self.comp_cache = {}
- def find_suggestions_real(self, pfx, params):
- # find level
- node = top_level
- for typ, w in self.sql_words(pfx):
- w = w.lower()
- node = node.get_next(typ, w, params)
- if not node:
- break
-
- # find possible matches
- if node:
- return node.get_completions(params)
- else:
- return []
-
def find_suggestions(self, pfx, curword, params = {}):
c_pfx = self.comp_cache.get('comp_pfx')
c_list = self.comp_cache.get('comp_list', [])
@@ -524,6 +566,21 @@ class AdminConsole:
res.append(cword)
return res
+ def find_suggestions_real(self, pfx, params):
+ # find level
+ node = top_level
+ for typ, w in self.sql_words(pfx):
+ w = w.lower()
+ node = node.get_next(typ, w, params)
+ if not node:
+ break
+
+ # find possible matches
+ if node:
+ return node.get_completions(params)
+ else:
+ return []
+
def exec_string(self, ln, eof = False):
node = top_level
params = {}
@@ -547,13 +604,21 @@ class AdminConsole:
print "multi-line commands not supported:", repr(ln)
def exec_params(self, params):
+ print 'RUN', params
cmd = params.get('cmd')
+ cmd2 = params.get('cmd2')
if not cmd:
print 'parse error: no command found'
return
+ if cmd2:
+ cmd = "%s_%s" % (cmd, cmd2)
#print 'RUN', repr(params)
fn = getattr(self, 'cmd_' + cmd, self.bad_cmd)
- fn(params)
+ try:
+ fn(params)
+ print 'OK'
+ except Exception, ex:
+ print str(ex)
def bad_cmd(self, params):
print 'unimplemented command'
@@ -623,6 +688,88 @@ class AdminConsole:
skytools.db_install(curs, objs, None)
print "%s installed" % mod_name
+ def cmd_show_queue(self, params):
+ queue = params.get('queue')
+ if queue is None:
+ queue = self.cur_queue
+ if not queue:
+ print 'No default queue'
+ return
+ curs = self.db.cursor()
+ fields = [
+ "queue_name",
+ "queue_cur_table || '/' || queue_ntables as tables",
+ "queue_ticker_max_count as max_cnt",
+ "queue_ticker_max_lag as max_lag",
+ "queue_ticker_idle_period as idle_period",
+ "ticker_lag",
+ ]
+ pfx = "select " + ",".join(fields)
+
+ if queue == '*':
+ q = pfx + " from pgq.get_queue_info()"
+ curs.execute(q)
+ else:
+ q = pfx + " from pgq.get_queue_info(%s)"
+ curs.execute(q, [queue])
+
+ display_result(curs, 'Queues')
+
+ def cmd_show_batch(self, params):
+ batch_id = params.get('batch_id')
+ consumer = params.get('consumer')
+ queue = self.cur_queue
+ if not queue:
+ print 'No default queue'
+ return
+ curs = self.db.cursor()
+ if consumer:
+ q = "select current_batch from pgq.get_consumer_info(%s, %s)"
+ curs.execute(q, [queue, consumer])
+ res = curs.fetchall()
+ if len != 1:
+ print 'no such consumer'
+ return
+ batch_id = res[0]['current_batch']
+ if batch_id is None:
+ print 'consumer has no open batch'
+ return
+
+ q = "select * from pgq.get_batch_events(%s)"
+ curs.execute(q, [batch_id])
+
+ display_result(curs, 'Batch events')
+
+ def cmd_register_consumer(self, params):
+ queue = self.cur_queue
+ if not queue:
+ print 'No default queue'
+ return
+ consumer = params['consumer']
+ curs = self.db.cursor()
+ q = "select * from pgq.register_consumer(%s, %s)"
+ curs.execute(q, [queue, consumer])
+
+ def cmd_unregister_consumer(self, params):
+ queue = self.cur_queue
+ if not queue:
+ print 'No default queue'
+ return
+ consumer = params['consumer']
+ curs = self.db.cursor()
+ q = "select * from pgq.unregister_consumer(%s, %s)"
+ curs.execute(q, [queue, consumer])
+
+ def cmd_create_queue(self, params):
+ curs = self.db.cursor()
+ q = "select * from pgq.create_queue(%(queue)s)"
+ curs.execute(q, params)
+
+ def cmd_drop_queue(self, params):
+ curs = self.db.cursor()
+ q = "select * from pgq.drop_queue(%(queue)s)"
+ curs.execute(q, params)
+
def main():
global script