summaryrefslogtreecommitdiff
path: root/python/qadmin.py
diff options
context:
space:
mode:
authorMarko Kreen2009-11-24 15:43:33 +0000
committerMarko Kreen2009-11-24 15:43:33 +0000
commit2fed3df04c7e9dac065976e5e0ee05baab63f6e5 (patch)
tree166cae677fc470840489cd9f8ff7996f85875693 /python/qadmin.py
parent2cd1e444a8ca3417ef0d5ca1d5141cde4122f01c (diff)
qadmin: parser cleanup, make * optional.
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-xpython/qadmin.py226
1 files changed, 110 insertions, 116 deletions
diff --git a/python/qadmin.py b/python/qadmin.py
index 0c8d625b..54abf872 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -3,23 +3,22 @@
"""Commands that require only database connection:
connect dbname=.. host=.. service=.. queue=..;
- connect queue=.. [ node=.. ];
+ connect [ queue=.. ] [ node=.. ];
install pgq | londiste;
create queue <qname>;
alter queue <qname | *> set param = , ...;
drop queue <qname>;
- show queue <qname | *>;
+ show queue [ <qname | *> ];
register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ;
unregister consumer foo [from <qname>];
- show consumer <cname | *> [on <qname>];
+ show consumer [ <cname | *> [on <qname>] ];
- show node <node | *> [on <qname>];
+ show node [ <node | *> [on <qname>] ];
Following commands expect default queue:
- show queue;
show batch <batch_id>;
show batch <consumer>;
"""
@@ -30,6 +29,13 @@ create <root | branch | leaf> node <node> location <loc> [on <qname>];
drop node <node> [on <qname>];
alter node <node> [location=<loc>]
show_queue_stats <q>;
+
+change provider
+drop node
+status
+rename node
+
+
"""
__version__ = '0.1'
@@ -53,11 +59,10 @@ General options:
--version
'''
-import sys, os, readline, getopt, re
+import sys, os, readline, getopt, re, psycopg2
import pkgloader
pkgloader.require('skytools', '3.0')
-
import skytools
script = None
@@ -92,7 +97,7 @@ def display_result(curs, desc, fields = []):
if not fields:
fields = [f[0] for f in curs.description]
- widths = [15] * len(fields)
+ widths = [10] * len(fields)
for i, f in enumerate(fields):
rlen = len(f)
if rlen > widths[i]:
@@ -113,29 +118,37 @@ def display_result(curs, desc, fields = []):
for row in rows:
print(fmt % tuple([row[k] for k in fields]))
- print('\n')
+ print('')
+
+##
+## Base token classes
+##
-class Node:
+class Token:
+ """Base class for tokens.
+
+ The optional 'param' kwarg will set corresponding key in
+ 'params' dict to final token value.
+ """
def __init__(self, **kwargs):
self.name = kwargs.get('name')
def get_next(self, typ, word, params):
+ """Return next token if 'word' matches this token."""
return None
def get_completions(self, params):
+ """Return list of all completions possible at this point."""
return []
-##
-## Token classes
-##
-
-class Proxy(Node):
+class Proxy(Token):
+ """Forward def for Token, in case it needs to be referenced recursively."""
def set_real(self, node):
self.get_next = node.get_next
self.get_completions = node.get_completions
-class WList(Node):
- c_append = ' '
+class List(Token):
+ """List of Tokens, will be tried sequentially until one matches."""
def __init__(self, *args, **kwargs):
- Node.__init__(self, **kwargs)
+ Token.__init__(self, **kwargs)
self.tok_list = args
def get_next(self, typ, word, params):
@@ -143,7 +156,7 @@ class WList(Node):
for w in self.tok_list:
n = w.get_next(typ, word, params)
if n:
- if self.name: # and not self.name in params:
+ if self.name:
params[self.name] = word
return n
return None
@@ -154,11 +167,18 @@ class WList(Node):
comp_list += w.get_completions(params)
return comp_list
-class Word(Node):
+##
+## Dynamic token classes
+##
+
+class Word(Token):
+ """Single fixed word."""
+ # token types to accept
tk_type = ("ident",)
+ # string to append to completions
c_append = ' '
def __init__(self, word, next, **kwargs):
- Node.__init__(self, **kwargs)
+ Token.__init__(self, **kwargs)
self.word = word
self.next = next
def get_wlist(self):
@@ -177,14 +197,18 @@ class Word(Node):
return comp_list
class DynList(Word):
+ """Dynamically generated list of words."""
def __init__(self, next, **kwargs):
Word.__init__(self, None, next, **kwargs)
+ def get_wlist(self):
+ return []
class DynIdentList(DynList):
+ """Accept quoted words."""
def get_next(self, typ, word, params):
"""Allow quoted queue names"""
next = DynList.get_next(self, typ, word, params)
- if next:
+ if next and self.name:
params[self.name] = unquote_any(word)
return next
@@ -227,16 +251,16 @@ class Port(DynList):
def get_wlist(self):
return ['5432', '6432']
-class SWord(Word):
- c_append = '='
-
class Symbol(Word):
tk_type = ("sym",)
c_append = ''
-class EQ(Symbol):
- def __init__(self, next):
- Symbol.__init__(self, '=', next)
+class WordEQ(Word):
+ """Word that is followed by '='."""
+ c_append = '='
+ def __init__(self, word, next, **kwargs):
+ next = Symbol('=', next)
+ Word.__init__(self, word, next, **kwargs)
class Value(DynList):
tk_type = ("str", "num", "ident")
@@ -255,43 +279,43 @@ eq_val = Symbol('=', Value(w_done, name = 'value'))
w_connect = Proxy()
w_connect.set_real(
- WList(
- SWord('dbname', EQ(Database(w_connect, name = 'dbname'))),
- SWord('host', EQ(Host(w_connect, name = 'host'))),
- SWord('port', EQ(Port(w_connect, name = 'port'))),
- SWord('user', EQ(User(w_connect, name = 'user'))),
- SWord('password', EQ(Value(w_connect, name = 'password'))),
- SWord('queue', EQ(Queue(w_connect, name = 'queue'))),
- SWord('node', EQ(DBNode(w_connect, name = 'node'))),
+ List(
+ WordEQ('dbname', Database(w_connect, name = 'dbname')),
+ WordEQ('host', Host(w_connect, name = 'host')),
+ WordEQ('port', Port(w_connect, name = 'port')),
+ WordEQ('user', User(w_connect, name = 'user')),
+ WordEQ('password', Value(w_connect, name = 'password')),
+ WordEQ('queue', Queue(w_connect, name = 'queue')),
+ WordEQ('node', DBNode(w_connect, name = 'node')),
w_done))
-w_show_batch = WList(
+w_show_batch = List(
BatchId(w_done, name = 'batch_id'),
Consumer(w_done, name = 'consumer'))
-w_show_queue = WList(
+w_show_queue = List(
Symbol('*', w_done, name = 'queue'),
Queue(w_done, name = 'queue'),
w_done)
-w_show_on_queue = WList(
+w_show_on_queue = List(
Symbol('*', w_done, name = 'queue'),
Queue(w_done, name = 'queue'),
)
-w_on_queue = WList(Word('on', w_show_on_queue), w_done)
+w_on_queue = List(Word('on', w_show_on_queue), w_done)
-w_show_consumer = WList(
+w_show_consumer = List(
Symbol('*', w_on_queue, name = 'consumer'),
Consumer(w_on_queue, name = 'consumer'),
- )
+ w_done)
-w_show_node = WList(
+w_show_node = List(
Symbol('*', w_on_queue, name = 'node'),
DBNode(w_on_queue, name = 'node'),
- )
+ w_done)
-w_show = WList(
+w_show = List(
Word('batch', w_show_batch),
Word('help', w_done),
Word('queue', w_show_queue),
@@ -299,26 +323,26 @@ w_show = WList(
Word('node', w_show_node),
name = "cmd2")
-w_install = WList(
+w_install = List(
Word('pgq', w_done),
Word('londiste', w_done),
name = 'module')
w_qargs2 = Proxy()
-w_qargs = WList(
- SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))),
- SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))),
- SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))),
- SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused'))))
+w_qargs = List(
+ WordEQ('idle_period', Value(w_qargs2, name = 'ticker_idle_period')),
+ WordEQ('max_count', Value(w_qargs2, name = 'ticker_max_count')),
+ WordEQ('max_lag', Value(w_qargs2, name = 'ticker_max_lag')),
+ WordEQ('paused', Value(w_qargs2, name = 'ticker_paused')))
-w_qargs2.set_real(WList(
+w_qargs2.set_real(List(
w_done,
Symbol(',', w_qargs)))
w_set_q = Word('set', w_qargs)
-w_alter_q = WList(
+w_alter_q = List(
Symbol('*', w_set_q, name = 'queue'),
Queue(w_set_q, name = 'queue'))
@@ -331,7 +355,7 @@ w_drop = Word('queue', Queue(w_done, name = 'queue'),
name = 'cmd2')
w_reg_target = Proxy()
-w_reg_target.set_real(WList(
+w_reg_target.set_real(List(
Word('on', Queue(w_reg_target, name = 'queue')),
Word('copy', Consumer(w_reg_target, name = 'copy_reg')),
Word('at', TickId(w_reg_target, name = 'at_tick')),
@@ -341,16 +365,13 @@ w_cons_on_queue = Word('consumer',
Consumer(w_reg_target, name = 'consumer'),
name = 'cmd2')
-w_from_queue = Word('from',
- Queue(w_done, name = 'queue'),
- name = 'fromqueue')
+w_from_queue = Word('from', Queue(w_done, name = 'queue'))
w_cons_from_queue = Word('consumer',
- Consumer(WList(w_done, w_from_queue),
- name = 'consumer'),
+ Consumer(List(w_done, w_from_queue), name = 'consumer'),
name = 'cmd2')
-w_top = WList(
+w_top = List(
Word('alter', w_alter),
Word('connect', w_connect),
Word('create', w_create),
@@ -387,6 +408,7 @@ class AdminConsole:
initial_connstr = None
rc_hosts = re.compile('\s+')
+
def get_queue_list(self):
q = "select queue_name from pgq.queue order by 1"
return self._ccache('queue_list', q, 'pgq')
@@ -458,7 +480,7 @@ class AdminConsole:
clist.append(h)
clist.sort()
self.comp_cache['host_list'] = clist
- except:
+ except IOError:
clist = []
return clist
@@ -526,6 +548,7 @@ class AdminConsole:
curs = db.cursor()
curs.execute(q)
res = curs.fetchone()
+ print 'Server version', res[1]
self.cur_database = res[0]
return db
@@ -551,7 +574,11 @@ class AdminConsole:
if self.cmd_file:
cmd_str = open(self.cmd_file, "r").read()
- self.db = self.db_connect(self.initial_connstr)
+ try:
+ self.db = self.db_connect(self.initial_connstr)
+ except psycopg2.Error, d:
+ print str(d).strip()
+ sys.exit(1)
if cmd_str:
self.exec_string(cmd_str)
@@ -562,6 +589,7 @@ class AdminConsole:
readline.parse_and_bind('tab: complete')
readline.set_completer(self.rl_completer_safe)
#print 'delims: ', repr(readline.get_completer_delims())
+
hist_file = os.path.expanduser("~/.qadmin_history")
try:
readline.read_history_file(hist_file)
@@ -572,7 +600,6 @@ class AdminConsole:
while 1:
try:
ln = self.line_input()
- #print 'line:', repr(ln)
self.exec_string(ln)
except KeyboardInterrupt:
print
@@ -580,7 +607,11 @@ class AdminConsole:
print
break
self.reset_comp_cache()
- readline.write_history_file(hist_file)
+
+ try:
+ readline.write_history_file(hist_file)
+ except IOError:
+ pass
def rl_completer(self, curword, state):
curline = readline.get_line_buffer()
@@ -686,7 +717,7 @@ class AdminConsole:
print 'unimplemented command'
def cmd_connect(self, params):
- qname = params.get('queue')
+ qname = params.get('queue', self.cur_queue)
if 'node' in params and not qname:
print 'node= needs a queue also'
@@ -731,7 +762,7 @@ class AdminConsole:
# set default queue
if 'queue' in params:
self.cur_queue = qname
-
+
print "CONNECT"
def cmd_install(self, params):
@@ -775,6 +806,8 @@ class AdminConsole:
"queue_ticker_idle_period as idle_period",
"queue_ticker_paused as paused",
"ticker_lag",
+ "ev_per_sec",
+ "ev_new",
]
pfx = "select " + ",".join(fields)
@@ -785,20 +818,12 @@ class AdminConsole:
q = pfx + " from pgq.get_queue_info(%s)"
curs.execute(q, [queue])
- display_result(curs, 'Queues')
+ display_result(curs, 'Queue: %s' % queue)
def cmd_show_consumer(self, params):
"""Show consumer status"""
- consumer = params.get('consumer')
- queue = params.get('queue')
-
- if not queue:
- # queue not provided, see if we have default.
- queue = self.cur_queue
-
- if not consumer:
- print 'Consumer not specified'
- return
+ consumer = params.get('consumer', '*')
+ queue = params.get('queue', '*')
q_queue = (queue != '*' and queue or None)
q_consumer = (consumer != '*' and consumer or None)
@@ -807,36 +832,29 @@ class AdminConsole:
q = "select * from pgq.get_consumer_info(%s, %s)"
curs.execute(q, [q_queue, q_consumer])
- display_result(curs, 'Consumers')
+ display_result(curs, 'Consumer "%s" on queue "%s"' % (consumer, queue))
def cmd_show_node(self, params):
"""Show node information."""
- # TODO: This should additionally show node roles, lags and hierarchy.
+ # TODO: This should additionally show node roles, lags and hierarchy.
# Similar to londiste "status".
- node = params.get('node')
- queue = params.get('queue')
-
- if not queue:
- # queue not provided, see if we have default.
- queue = self.cur_queue
-
- if not node:
- print 'Node not specified'
- return
+ node = params.get('node', '*')
+ queue = params.get('queue', '*')
q_queue = (queue != '*' and queue or None)
q_node = (node != '*' and node or None)
curs = self.db.cursor()
- q = """select node_name, node_name, node_location, dead, queue_name
+ q = """select queue_name, node_name, node_location, dead
from pgq_node.node_location
where node_name = coalesce(%s, node_name)
- and queue_name = coalesce(%s, queue_name)"""
+ and queue_name = coalesce(%s, queue_name)
+ order by 1,2"""
curs.execute(q, [q_node, q_queue])
- display_result(curs, 'Database nodes')
+ display_result(curs, 'Node "%s" on queue "%s"' % (node, queue))
def cmd_show_batch(self, params):
batch_id = params.get('batch_id')
@@ -960,33 +978,9 @@ class AdminConsole:
def main():
global script
-
script = AdminConsole()
script.run(sys.argv[1:])
-def test(pfx, curword):
- global script
- params = {}
- script = AdminConsole()
- sgs = script.find_suggestions(pfx, curword, params)
- #print repr(pfx), repr(curword), repr(sgs), repr(params)
-
-def sgtest():
- global script
- script = AdminConsole()
- test('', '')
- test('', 'se')
- test('', 'cr')
- test('set ', '')
- test('set ', 'q')
- test('set queue = blah;', '')
- test('set queue = ', '')
-
- script.exec_string('create queue foo;')
- script.exec_string('create queue "foo";')
- script.exec_string('create queue \'foo\';')
-
if __name__ == '__main__':
- #sgtest()
main()