diff options
author | Marko Kreen | 2010-11-25 09:53:17 +0000 |
---|---|---|
committer | Marko Kreen | 2010-11-25 09:53:17 +0000 |
commit | 5924197a0ab167d65b2aa316e514f49c666b0e5b (patch) | |
tree | d00c5b75fcbe36fbf14c342a4add6519ddd77fe5 /python/qadmin.py | |
parent | 2d169f74668a0bd6e3019971235ba7f9e9f325f2 (diff) |
qadmin: londiste commands
add_table, remove_table, tables,
add_seq, remove_seq, seqs,
missing
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-x | python/qadmin.py | 172 |
1 files changed, 170 insertions, 2 deletions
diff --git a/python/qadmin.py b/python/qadmin.py index bdc47cc2..a31e2a75 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -22,6 +22,16 @@ Following commands expect default queue: show batch <batch_id>; show batch <consumer>; +Londiste commands: + + londiste add_table <tbl>; + londiste remove_table <tbl>; + londiste add_seq <seq>; + londiste remove_seq <seq>; + londiste tables; + londiste seqs; + londiste missing; + Other commands: exit; (or press ^D) @@ -240,6 +250,22 @@ class User(DynList): def get_wlist(self): return script.get_user_list() +class NewTable(DynList): + def get_wlist(self): + return script.get_new_table_list() + +class KnownTable(DynList): + def get_wlist(self): + return script.get_known_table_list() + +class NewSeq(DynList): + def get_wlist(self): + return script.get_new_seq_list() + +class KnownSeq(DynList): + def get_wlist(self): + return script.get_known_seq_list() + class BatchId(DynList): tk_type = ("num",) def get_wlist(self): @@ -375,6 +401,21 @@ w_cons_from_queue = Word('consumer', Consumer(List(w_done, w_from_queue), name = 'consumer'), name = 'cmd2') +w_londiste_add_table = NewTable(w_done, name = 'table') +w_londiste_add_seq = NewSeq(w_done, name = 'seq') +w_londiste_remove_table = KnownTable(w_done, name = 'table') +w_londiste_remove_seq = KnownSeq(w_done, name = 'seq') + +w_londiste = List( + Word('add_table', w_londiste_add_table), + Word('add_seq', w_londiste_add_seq), + Word('remove_table', w_londiste_remove_table), + Word('remove_seq', w_londiste_remove_seq), + Word('missing', w_done), + Word('tables', w_done), + Word('seqs', w_done), + name = "cmd2") + w_top = List( Word('alter', w_alter), Word('connect', w_connect), @@ -385,6 +426,7 @@ w_top = List( Word('unregister', w_cons_from_queue), Word('show', w_show), Word('exit', w_done), + Word('londiste', w_londiste), name = "cmd") top_level.set_real(w_top) @@ -434,6 +476,52 @@ class AdminConsole: q = "select distinct node_name from pgq_node.node_location order by 1" return self._ccache('node_list', q, 'pgq_node') + def get_new_table_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = """select n.nspname || '.' || r.relname + from pg_catalog.pg_class r + join pg_catalog.pg_namespace n on (n.oid = r.relnamespace) + left join londiste.table_info i on (i.queue_name = %s and i.table_name = (n.nspname || '.' || r.relname)) + where r.relkind='r' + and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') + and n.nspname !~ 'pg_.*' + and i.table_name is null + order by 1 """ % qname + return self._ccache('new_table_list', q, 'londiste') + + def get_known_table_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = "select table_name from londiste.table_info"\ + " where queue_name = %s order by 1" % qname + return self._ccache('known_table_list', q, 'londiste') + + def get_new_seq_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = """select n.nspname || '.' || r.relname + from pg_catalog.pg_class r + join pg_catalog.pg_namespace n on (n.oid = r.relnamespace) + left join londiste.seq_info i on (i.queue_name = %s and i.seq_name = (n.nspname || '.' || r.relname)) + where r.relkind='S' + and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext') + and n.nspname !~ 'pg_.*' + and i.seq_name is null + order by 1 """ % qname + return self._ccache('new_seq_list', q, 'londiste') + + def get_known_seq_list(self): + if not self.cur_queue: + return [] + qname = skytools.quote_literal(self.cur_queue) + q = "select seq_name from londiste.seq_info"\ + " where queue_name = %s order by 1" % qname + return self._ccache('known_seq_list', q, 'londiste') + def get_batch_list(self): if not self.cur_queue: return [] @@ -645,6 +733,7 @@ class AdminConsole: def sql_words(self, sql): return skytools.sql_tokenizer(sql, standard_quoting = True, + fqident = True, ignore_whitespace = True) def reset_comp_cache(self): @@ -687,11 +776,12 @@ class AdminConsole: w = w.lower() #print repr(typ), repr(w) if typ == 'error': - print 'syntax error:', repr(ln) + print 'syntax error 1:', repr(ln) return + onode = node node = node.get_next(typ, w, params) if not node: - print "syntax error:", repr(ln) + print "syntax error 2:", repr(ln), repr(typ), repr(w), repr(params) return if node == top_level: self.exec_params(params) @@ -717,6 +807,7 @@ class AdminConsole: fn(params) except Exception, ex: print "ERROR: %s" % str(ex).strip() + raise def bad_cmd(self, params): print 'unimplemented command' @@ -984,6 +1075,83 @@ class AdminConsole: def cmd_exit(self, params): sys.exit(0) + ## + ## Londiste + ## + + def cmd_londiste_missing(self, params): + """Show missing objects.""" + + queue = self.cur_queue + + curs = self.db.cursor() + q = """select * from londiste.local_show_missing(%s)""" + curs.execute(q, [queue]) + + display_result(curs, 'Missing objects queue "%s"' % (queue)) + + def cmd_londiste_tables(self, params): + """Show local tables.""" + + queue = self.cur_queue + + curs = self.db.cursor() + q = """select * from londiste.get_table_list(%s) where local""" + curs.execute(q, [queue]) + + display_result(curs, 'Local tables on queue "%s"' % (queue)) + + def cmd_londiste_seqs(self, params): + """Show local seqs.""" + + queue = self.cur_queue + + curs = self.db.cursor() + q = """select * from londiste.get_seq_list(%s) where local""" + curs.execute(q, [queue]) + + display_result(curs, 'Sequences on queue "%s"' % (queue)) + + def cmd_londiste_add_table(self, params): + """Add table.""" + + curs = self.db.cursor() + q = """select * from londiste.local_add_table(%s, %s)""" + curs.execute(q, [self.cur_queue, params['table']]) + + res = curs.fetchone() + print 'ADD_TABLE:', res[0], res[1] + + def cmd_londiste_remove_table(self, params): + """Remove table.""" + + curs = self.db.cursor() + q = """select * from londiste.local_remove_table(%s, %s)""" + curs.execute(q, [self.cur_queue, params['table']]) + + res = curs.fetchone() + print 'REMOVE_TABLE:', res[0], res[1] + + def cmd_londiste_add_seq(self, params): + """Add seq.""" + + curs = self.db.cursor() + q = """select * from londiste.local_add_seq(%s, %s)""" + curs.execute(q, [self.cur_queue, params['seq']]) + + res = curs.fetchone() + print 'ADD_SEQ:', res[0], res[1] + + def cmd_londiste_remove_seq(self, params): + """Remove seq.""" + + curs = self.db.cursor() + q = """select * from londiste.local_remove_seq(%s, %s)""" + curs.execute(q, [self.cur_queue, params['seq']]) + + res = curs.fetchone() + print 'REMOVE_SEQ:', res[0], res[1] + def main(): global script script = AdminConsole() |