diff options
author | Martin Pihlak | 2009-04-30 09:10:22 +0000 |
---|---|---|
committer | Martin Pihlak | 2009-04-30 09:10:22 +0000 |
commit | 70f33611747f55d99cdcc0e27a13cf77f68aa6eb (patch) | |
tree | 12ba153d366aa549aa52bb0b38fd006c2041f94e /python/qadmin.py | |
parent | 24601482a366c54e14e9ba32adca92995c8f752e (diff) |
"newadm" renamed to "qadmin"
Documentation and install scripts modified accordingly.
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-x | python/qadmin.py | 950 |
1 files changed, 950 insertions, 0 deletions
diff --git a/python/qadmin.py b/python/qadmin.py new file mode 100755 index 00000000..5aaada51 --- /dev/null +++ b/python/qadmin.py @@ -0,0 +1,950 @@ +#! /usr/bin/env python + +"""Commands that require only database connection: + + connect dbname=.. host=.. service=.. queue=..; + connect queue=.. [ node=.. ]; + install pgq | londiste; + + create queue <qname>; + alter queue <qname | *> set param = , ...; + drop queue <qname>; + show queue <qname | *>; + + register consumer foo [on <qname>]; + unregister consumer foo [from <qname>]; + show consumer <cname | *> [on <qname>]; + + show node <node | *> [on <qname>]; + +Following commands expect default queue: + show queue; + show batch <batch_id>; + show batch <consumer>; +""" + +# unimplemented: +""" +create <root | branch | leaf> node <node> location <loc> [on <qname>]; +drop node <node> [on <qname>]; +alter node <node> [location=<loc>] +show_queue_stats <q>; +""" + +__version__ = '0.1' + +cmdline_usage = '''\ +Usage: newadm [switches] + +Initial connection options: + -h host + -p port + -U user + -d dbname + -Q queuename + +Command options: + -c cmd_string + -f execfile + +General options: + --help + --version +''' + +import sys, os, readline, skytools, getopt, re + +script = None + +IGNORE_HOSTS = { + 'ip6-allhosts': 1, + 'ip6-allnodes': 1, + 'ip6-allrouters': 1, + #'ip6-localhost': 1, + 'ip6-localnet': 1, + 'ip6-loopback': 1, + 'ip6-mcastprefix': 1, +} + +def unquote_any(self, s): + if s: + c = s[0] + if c == "'": + s = skytools.unquote_literal(s, stdstr = True) + elif c == '"': + s = skytools.unquote_ident(s) + # extquote? + else: + s = s.lower() + return s + +def display_result(curs, desc, fields = []): + """Display multirow query as a table.""" + + rows = curs.fetchall() + + if not fields: + fields = [f[0] for f in curs.description] + + widths = [15] * len(fields) + for i, f in enumerate(fields): + rlen = len(f) + if rlen > widths[i]: + widths[i] = rlen + for row in rows: + for i, k in enumerate(fields): + rlen = row[k] and len(str(row[k])) or 0 + if rlen > widths[i]: + widths[i] = rlen + widths = [w + 2 for w in widths] + + fmt = '%%-%ds' * (len(widths) - 1) + '%%s' + fmt = fmt % tuple(widths[:-1]) + if desc: + print(desc) + print(fmt % tuple(fields)) + print(fmt % tuple([ '-' * (w - 2) for w in widths ])) + + for row in rows: + print(fmt % tuple([row[k] for k in fields])) + print('\n') + +class Node: + def __init__(self, **kwargs): + self.name = kwargs.get('name') + def get_next(self, typ, word, params): + return None + def get_completions(self, params): + return [] + +## +## Token classes +## + +class Proxy(Node): + def set_real(self, node): + self.get_next = node.get_next + self.get_completions = node.get_completions + +class WList(Node): + c_append = ' ' + def __init__(self, *args, **kwargs): + Node.__init__(self, **kwargs) + self.tok_list = args + + def get_next(self, typ, word, params): + cw = word.lower() + for w in self.tok_list: + n = w.get_next(typ, word, params) + if n: + if self.name: # and not self.name in params: + params[self.name] = word + return n + return None + + def get_completions(self, params): + comp_list = [] + for w in self.tok_list: + comp_list += w.get_completions(params) + return comp_list + +class Word(Node): + tk_type = ("ident",) + c_append = ' ' + def __init__(self, word, next, **kwargs): + Node.__init__(self, **kwargs) + self.word = word + self.next = next + def get_wlist(self): + return [self.word] + def get_next(self, typ, word, params): + if typ not in self.tk_type: + return None + if self.word and word != self.word: + return None + if self.name: # and not self.name in params: + params[self.name] = word + return self.next + def get_completions(self, params): + wlist = self.get_wlist() + comp_list = [w + self.c_append for w in wlist] + return comp_list + +class DynList(Word): + def __init__(self, next, **kwargs): + Word.__init__(self, None, next, **kwargs) + +class DynIdentList(DynList): + def get_next(self, typ, word, params): + """Allow quoted queue names""" + next = DynList.get_next(self, typ, word, params) + if next: + params[self.name] = unquote_any(self, word) + return next + +class Queue(DynIdentList): + def get_wlist(self): + return script.get_queue_list() + +class Consumer(DynIdentList): + def get_wlist(self): + return script.get_consumer_list() + +class DBNode(DynList): + def get_wlist(self): + return script.get_node_list() + +class Database(DynList): + def get_wlist(self): + return script.get_database_list() + +class Host(DynList): + def get_wlist(self): + return script.get_host_list() + +class User(DynList): + def get_wlist(self): + return script.get_user_list() + +class BatchId(DynList): + tk_type = ("num",) + def get_wlist(self): + return script.get_batch_list() + +class Port(DynList): + tk_type = ("num",) + def get_wlist(self): + return ['5432', '6432'] + +class SWord(Word): + c_append = '=' + +class Symbol(Word): + tk_type = ("sym",) + c_append = '' + +class EQ(Symbol): + def __init__(self, next): + Symbol.__init__(self, '=', next) + +class Value(DynList): + tk_type = ("str", "num", "ident") + def get_wlist(self): + return [] + +## +## Now describe the syntax. +## + +top_level = Proxy() + +w_done = Symbol(';', top_level) + +eq_val = Symbol('=', Value(w_done, name = 'value')) + +w_connect = Proxy() +w_connect.set_real( + WList( + SWord('dbname', EQ(Database(w_connect, name = 'dbname'))), + SWord('host', EQ(Host(w_connect, name = 'host'))), + SWord('port', EQ(Port(w_connect, name = 'port'))), + SWord('user', EQ(User(w_connect, name = 'user'))), + SWord('password', EQ(Value(w_connect, name = 'password'))), + SWord('queue', EQ(Queue(w_connect, name = 'queue'))), + SWord('node', EQ(DBNode(w_connect, name = 'node'))), + w_done)) + +w_show_batch = WList( + BatchId(w_done, name = 'batch_id'), + Consumer(w_done, name = 'consumer')) + +w_show_queue = WList( + Symbol('*', w_done, name = 'queue'), + Queue(w_done, name = 'queue'), + w_done) + +w_show_on_queue = WList( + Symbol('*', w_done, name = 'queue'), + Queue(w_done, name = 'queue'), + ) + +w_on_queue = WList(Word('on', w_show_on_queue), w_done) + +w_show_consumer = WList( + Symbol('*', w_on_queue, name = 'consumer'), + Consumer(w_on_queue, name = 'consumer'), + ) + +w_show_node = WList( + Symbol('*', w_on_queue, name = 'node'), + DBNode(w_on_queue, name = 'node'), + ) + +w_show = WList( + Word('batch', w_show_batch), + Word('help', w_done), + Word('queue', w_show_queue), + Word('consumer', w_show_consumer), + Word('node', w_show_node), + name = "cmd2") + +w_install = WList( + Word('pgq', w_done), + Word('londiste', w_done), + name = 'module') + +w_qargs2 = Proxy() + +w_qargs = WList( + SWord('idle_period', EQ(Value(w_qargs2, name = 'ticker_idle_period'))), + SWord('max_count', EQ(Value(w_qargs2, name = 'ticker_max_count'))), + SWord('max_lag', EQ(Value(w_qargs2, name = 'ticker_max_lag'))), + SWord('paused', EQ(Value(w_qargs2, name = 'ticker_paused')))) + +w_qargs2.set_real(WList( + w_done, + Symbol(',', w_qargs))) + +w_set_q = Word('set', w_qargs) + +w_alter_q = WList( + Symbol('*', w_set_q, name = 'queue'), + Queue(w_set_q, name = 'queue')) + +w_alter = Word('queue', w_alter_q, name = 'cmd2') + +w_create = Word('queue', Queue(w_done, name = 'queue'), + name = 'cmd2') + +w_drop = Word('queue', Queue(w_done, name = 'queue'), + name = 'cmd2') + +w_on_queue = Word('on', + Queue(w_done, name = 'queue'), + name = 'onqueue') + +w_cons_on_queue = Word('consumer', + Consumer(WList(w_done, w_on_queue), + name = 'consumer'), + name = 'cmd2') + +w_from_queue = Word('from', + Queue(w_done, name = 'queue'), + name = 'fromqueue') + +w_cons_from_queue = Word('consumer', + Consumer(WList(w_done, w_from_queue), + name = 'consumer'), + name = 'cmd2') + +w_top = WList( + Word('alter', w_alter), + Word('connect', w_connect), + Word('create', w_create), + Word('drop', w_drop), + Word('install', w_install), + Word('register', w_cons_on_queue), + Word('unregister', w_cons_from_queue), + Word('show', w_show), + name = "cmd") + +top_level.set_real(w_top) + +## +## Main class for keeping the state. +## + +class AdminConsole: + cur_queue = None + cur_database = None + + cmd_file = None + cmd_str = None + + comp_cache = { + 'comp_pfx': None, + 'comp_list': None, + 'queue_list': None, + 'database_list': None, + 'consumer_list': None, + 'host_list': None, + 'user_list': None, + } + db = None + + rc_hosts = re.compile('\s+') + def get_queue_list(self): + q = "select queue_name from pgq.queue order by 1" + return self._ccache('queue_list', q, 'pgq') + + def get_database_list(self): + q = "select datname from pg_catalog.pg_database order by 1" + return self._ccache('database_list', q) + + def get_user_list(self): + q = "select usename from pg_catalog.pg_user order by 1" + return self._ccache('user_list', q) + + def get_consumer_list(self): + q = "select co_name from pgq.consumer order by 1" + return self._ccache('consumer_list', q, 'pgq') + + def get_node_list(self): + q = "select distinct node_name from pgq_node.node_location order by 1" + return self._ccache('node_list', q, 'pgq_node') + + def get_batch_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = "select current_batch::text from pgq.get_consumer_info(%s)"\ + " where current_batch is not null order by 1" % qname + return self._ccache('batch_list', q, 'pgq') + + def _ccache(self, cname, q, req_schema = None): + if not self.db: + return [] + + # check if schema exists + if req_schema: + k = "schema_exists_%s" % req_schema + ok = self.comp_cache.get(k) + if ok is None: + curs = self.db.cursor() + ok = skytools.exists_schema(curs, req_schema) + self.comp_cache[k] = ok + if not ok: + return [] + + # actual completion + clist = self.comp_cache.get(cname) + if clist is None: + curs = self.db.cursor() + curs.execute(q) + clist = [r[0] for r in curs.fetchall()] + self.comp_cache[cname] = clist + return clist + + def get_host_list(self): + clist = self.comp_cache.get('host_list') + if clist is None: + try: + f = open('/etc/hosts', 'r') + clist = [] + while 1: + ln = f.readline() + if not ln: + break + ln = ln.strip() + if ln == '' or ln[0] == '#': + continue + lst = self.rc_hosts.split(ln) + for h in lst[1:]: + if h not in IGNORE_HOSTS: + clist.append(h) + clist.sort() + self.comp_cache['host_list'] = clist + except: + clist = [] + return clist + + def parse_cmdline(self, argv): + switches = "c:h:p:d:U:f:Q:" + lswitches = ['help', 'version'] + try: + opts, args = getopt.getopt(argv, switches, lswitches) + except getopt.GetoptError, ex: + print str(ex) + print "Use --help to see command line options" + sys.exit(1) + + cstr_map = { + 'dbname': None, + 'host': None, + 'port': None, + 'user': None, + 'password': None, + } + cmd_file = cmd_str = None + for o, a in opts: + if o == "--help": + print cmdline_usage + sys.exit(0) + elif o == "--version": + print "newadm version %s" % __version__ + sys.exit(0) + elif o == "-h": + cstr_map['host'] = a + elif o == "-p": + cstr_map['port'] = a + elif o == "-d": + cstr_map['dbname'] = a + elif o == "-U": + cstr_map['user'] = a + elif o == "-Q": + self.cur_queue = a + elif o == "-c": + self.cmd_str = a + elif o == "-f": + self.cmd_file = a + + cstr_list = [] + for k, v in cstr_map.items(): + if v is not None: + cstr_list.append("%s=%s" % (k, v)) + if len(args) == 1: + a = args[0] + if a.find('=') >= 0: + cstr_list.append(a) + else: + cstr_list.append("dbname=%s" % a) + elif len(args) > 1: + print "too many arguments, use --help to see syntax" + sys.exit(1) + + self.initial_connstr = " ".join(cstr_list) + + def db_connect(self, connstr): + db = skytools.connect_database(connstr) + db.set_isolation_level(0) # autocommit + + q = "select current_database(), current_setting('server_version')" + curs = db.cursor() + curs.execute(q) + res = curs.fetchone() + self.cur_database = res[0] + return db + + #print res + #print dir(self.db) + #print dir(self.db.cursor()) + #print self.db.status + #print "connected to", repr(self.initial_connstr) + + + def run(self, argv): + self.parse_cmdline(argv) + + if self.cmd_file is not None and self.cmd_str is not None: + print "cannot handle -c and -f together" + sys.exit(1) + + # append ; to cmd_str if needed + if self.cmd_str and not self.cmd_str.rstrip().endswith(';'): + self.cmd_str += ';' + + cmd_str = self.cmd_str + if self.cmd_file: + cmd_str = open(self.cmd_file, "r").read() + + self.db = self.db_connect(self.initial_connstr) + + if cmd_str: + self.exec_string(cmd_str) + else: + self.main_loop() + + def main_loop(self): + readline.parse_and_bind('tab: complete') + readline.set_completer(self.rl_completer_safe) + #print 'delims: ', repr(readline.get_completer_delims()) + hist_file = os.path.expanduser("~/.newadm_history") + try: + readline.read_history_file(hist_file) + except IOError: + pass + + print "Use 'show help;' to see available commands." + while 1: + try: + ln = self.line_input() + #print 'line:', repr(ln) + self.exec_string(ln) + except KeyboardInterrupt: + print + except EOFError: + print + break + self.reset_comp_cache() + readline.write_history_file(hist_file) + + def rl_completer(self, curword, state): + curline = readline.get_line_buffer() + start = readline.get_begidx() + end = readline.get_endidx() + + pfx = curline[:start] + sglist = self.find_suggestions(pfx, curword) + if state < len(sglist): + return sglist[state] + return None + + def rl_completer_safe(self, curword, state): + try: + return self.rl_completer(curword, state) + except BaseException, det: + print 'got some error', str(det) + + def line_input(self): + qname = "(noqueue)" + if self.cur_queue: + qname = self.cur_queue + p = "%s@%s> " % (qname, self.cur_database) + return raw_input(p) + + def sql_words(self, sql): + return skytools.sql_tokenizer(sql, + standard_quoting = True, + ignore_whitespace = True) + + def reset_comp_cache(self): + self.comp_cache = {} + + def find_suggestions(self, pfx, curword, params = {}): + c_pfx = self.comp_cache.get('comp_pfx') + c_list = self.comp_cache.get('comp_list', []) + if c_pfx != pfx: + c_list = self.find_suggestions_real(pfx, params) + self.comp_cache['comp_pfx'] = pfx + self.comp_cache['comp_list'] = c_list + + wlen = len(curword) + res = [] + for cword in c_list: + if curword == cword[:wlen]: + res.append(cword) + return res + + def find_suggestions_real(self, pfx, params): + # find level + node = top_level + for typ, w in self.sql_words(pfx): + w = w.lower() + node = node.get_next(typ, w, params) + if not node: + break + + # find possible matches + if node: + return node.get_completions(params) + else: + return [] + + def exec_string(self, ln, eof = False): + node = top_level + params = {} + for typ, w in self.sql_words(ln): + w = w.lower() + #print repr(typ), repr(w) + if typ == 'error': + print 'syntax error:', repr(ln) + return + node = node.get_next(typ, w, params) + if not node: + print "syntax error:", repr(ln) + return + if node == top_level: + self.exec_params(params) + params = {} + if eof: + if params: + self.exec_params(params) + elif node != top_level: + print "multi-line commands not supported:", repr(ln) + + def exec_params(self, params): + #print 'RUN', params + cmd = params.get('cmd') + cmd2 = params.get('cmd2') + if not cmd: + print 'parse error: no command found' + return + if cmd2: + cmd = "%s_%s" % (cmd, cmd2) + #print 'RUN', repr(params) + fn = getattr(self, 'cmd_' + cmd, self.bad_cmd) + try: + fn(params) + except Exception, ex: + print "ERROR: %s" % str(ex).strip() + + def bad_cmd(self, params): + print 'unimplemented command' + + def cmd_connect(self, params): + qname = params.get('queue') + if not qname: + qname = self.cur_queue + if 'node' in params and not qname: + print 'node= needs a queue also' + return + + # load raw connection params + cdata = [] + for k in ('dbname', 'host', 'port', 'user', 'password'): + if k in params: + arg = "%s=%s" % (k, params[k]) + cdata.append(arg) + + # raw connect + if cdata: + if 'node' in params: + print 'node= cannot be used together with raw params' + return + cstr = " ".join(cdata) + self.db = self.db_connect(cstr) + + # connect to node + if 'node' in params: + curs = self.db.cursor() + q = "select node_location from pgq_node.get_queue_locations(%s)"\ + " where node_name = %s" + curs.execute(q, [qname, params['node']]) + res = curs.fetchall() + if len(res) == 0: + print "node not found" + return + cstr = res[0]['node_location'] + self.db = self.db_connect(cstr) + + # set default queue + if 'queue' in params: + self.cur_queue = qname + + print "CONNECT" + + def cmd_install(self, params): + pgq_objs = [ + skytools.DBLanguage("plpgsql"), + skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"), + skytools.DBSchema("pgq", sql_file="pgq.sql"), + skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"), + skytools.DBSchema("pgq_node", sql_file="pgq_node.sql"), + ] + londiste_objs = pgq_objs + [ + skytools.DBSchema("londiste", sql_file="londiste.sql"), + ] + mod_map = { + 'londiste': londiste_objs, + 'pgq': pgq_objs, + } + mod_name = params['module'] + objs = mod_map[mod_name] + if not self.db: + print "no db?" + return + curs = self.db.cursor() + skytools.db_install(curs, objs, None) + print "INSTALL" + + def cmd_show_queue(self, params): + queue = params.get('queue') + if queue is None: + queue = self.cur_queue + if not queue: + print 'No default queue' + return + curs = self.db.cursor() + fields = [ + "queue_name", + "queue_cur_table || '/' || queue_ntables as tables", + "queue_ticker_max_count as max_cnt", + "queue_ticker_max_lag as max_lag", + "queue_ticker_idle_period as idle_period", + "queue_ticker_paused as paused", + "ticker_lag", + ] + pfx = "select " + ",".join(fields) + + if queue == '*': + q = pfx + " from pgq.get_queue_info()" + curs.execute(q) + else: + q = pfx + " from pgq.get_queue_info(%s)" + curs.execute(q, [queue]) + + display_result(curs, 'Queues') + + def cmd_show_consumer(self, params): + """Show consumer status""" + consumer = params.get('consumer') + queue = params.get('queue') + + if not queue: + # queue not provided, see if we have default. + queue = self.cur_queue + + if not consumer: + print 'Consumer not specified' + return + + q_queue = (queue != '*' and queue or None) + q_consumer = (consumer != '*' and consumer or None) + + curs = self.db.cursor() + q = "select * from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [q_queue, q_consumer]) + + display_result(curs, 'Consumers') + + def cmd_show_node(self, params): + """Show node information.""" + + # TODO: This should additionally show node roles, lags and hierarchy. + # Similar to londiste "status". + + node = params.get('node') + queue = params.get('queue') + + if not queue: + # queue not provided, see if we have default. + queue = self.cur_queue + + if not node: + print 'Node not specified' + return + + q_queue = (queue != '*' and queue or None) + q_node = (node != '*' and node or None) + + curs = self.db.cursor() + q = """select node_name, node_name, node_location, dead, queue_name + from pgq_node.node_location + where node_name = coalesce(%s, node_name) + and queue_name = coalesce(%s, queue_name)""" + curs.execute(q, [q_node, q_queue]) + + display_result(curs, 'Database nodes') + + def cmd_show_batch(self, params): + batch_id = params.get('batch_id') + consumer = params.get('consumer') + queue = self.cur_queue + if not queue: + print 'No default queue' + return + curs = self.db.cursor() + if consumer: + q = "select current_batch from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + res = curs.fetchall() + if len(res) != 1: + print 'no such consumer' + return + batch_id = res[0]['current_batch'] + if batch_id is None: + print 'consumer has no open batch' + return + + q = "select * from pgq.get_batch_events(%s)" + curs.execute(q, [batch_id]) + + display_result(curs, 'Batch events') + + def cmd_register_consumer(self, params): + queue = params.get("queue", self.cur_queue) + if not queue: + print 'No queue specified' + return + consumer = params['consumer'] + curs = self.db.cursor() + q = "select * from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + if curs.fetchone(): + print 'Consumer already registered' + return + q = "select * from pgq.register_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) + print "REGISTER" + + def cmd_unregister_consumer(self, params): + queue = params.get("queue", self.cur_queue) + if not queue: + print 'No queue specified' + return + consumer = params['consumer'] + curs = self.db.cursor() + q = "select * from pgq.unregister_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) + print "UNREGISTER" + + def cmd_create_queue(self, params): + curs = self.db.cursor() + q = "select * from pgq.get_queue_info(%(queue)s)" + curs.execute(q, params) + if curs.fetchone(): + print "Queue already exists" + return + q = "select * from pgq.create_queue(%(queue)s)" + curs.execute(q, params) + print "CREATE" + + def cmd_drop_queue(self, params): + curs = self.db.cursor() + q = "select * from pgq.drop_queue(%(queue)s)" + curs.execute(q, params) + print "DROP" + + def cmd_alter_queue(self, params): + """Alter queue parameters, accepts * for all queues""" + queue = params.get('queue') + curs = self.db.cursor() + if queue == '*': + # operate on list of queues + q = "select queue_name from pgq.get_queue_info()" + curs.execute(q) + qlist = [ r[0] for r in curs.fetchall() ] + else: + # just single queue specified + qlist = [ queue ] + + for qname in qlist: + params['queue'] = qname + + # loop through the parameters, passing any unrecognized + # key down pgq.set_queue_config + for k in params: + if k in ('queue', 'cmd', 'cmd2'): + continue + + q = "select * from pgq.set_queue_config" \ + "(%%(queue)s, '%s', %%(%s)s)" % (k, k) + + curs.execute(q, params) + print "ALTER" + + def cmd_show_help(self, params): + print __doc__ + +def main(): + global script + + script = AdminConsole() + script.run(sys.argv[1:]) + +def test(pfx, curword): + global script + params = {} + script = AdminConsole() + sgs = script.find_suggestions(pfx, curword, params) + #print repr(pfx), repr(curword), repr(sgs), repr(params) + +def sgtest(): + global script + script = AdminConsole() + test('', '') + test('', 'se') + test('', 'cr') + test('set ', '') + test('set ', 'q') + test('set queue = blah;', '') + test('set queue = ', '') + + script.exec_string('create queue foo;') + script.exec_string('create queue "foo";') + script.exec_string('create queue \'foo\';') + +if __name__ == '__main__': + #sgtest() + main() + |