summaryrefslogtreecommitdiff
path: root/python/qadmin.py
diff options
context:
space:
mode:
authorMarko Kreen2010-11-25 09:53:17 +0000
committerMarko Kreen2010-11-25 09:53:17 +0000
commit5924197a0ab167d65b2aa316e514f49c666b0e5b (patch)
treed00c5b75fcbe36fbf14c342a4add6519ddd77fe5 /python/qadmin.py
parent2d169f74668a0bd6e3019971235ba7f9e9f325f2 (diff)
qadmin: londiste commands
add_table, remove_table, tables, add_seq, remove_seq, seqs, missing
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-xpython/qadmin.py172
1 files changed, 170 insertions, 2 deletions
diff --git a/python/qadmin.py b/python/qadmin.py
index bdc47cc2..a31e2a75 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -22,6 +22,16 @@ Following commands expect default queue:
show batch <batch_id>;
show batch <consumer>;
+Londiste commands:
+
+ londiste add_table <tbl>;
+ londiste remove_table <tbl>;
+ londiste add_seq <seq>;
+ londiste remove_seq <seq>;
+ londiste tables;
+ londiste seqs;
+ londiste missing;
+
Other commands:
exit; (or press ^D)
@@ -240,6 +250,22 @@ class User(DynList):
def get_wlist(self):
return script.get_user_list()
+class NewTable(DynList):
+ def get_wlist(self):
+ return script.get_new_table_list()
+
+class KnownTable(DynList):
+ def get_wlist(self):
+ return script.get_known_table_list()
+
+class NewSeq(DynList):
+ def get_wlist(self):
+ return script.get_new_seq_list()
+
+class KnownSeq(DynList):
+ def get_wlist(self):
+ return script.get_known_seq_list()
+
class BatchId(DynList):
tk_type = ("num",)
def get_wlist(self):
@@ -375,6 +401,21 @@ w_cons_from_queue = Word('consumer',
Consumer(List(w_done, w_from_queue), name = 'consumer'),
name = 'cmd2')
+w_londiste_add_table = NewTable(w_done, name = 'table')
+w_londiste_add_seq = NewSeq(w_done, name = 'seq')
+w_londiste_remove_table = KnownTable(w_done, name = 'table')
+w_londiste_remove_seq = KnownSeq(w_done, name = 'seq')
+
+w_londiste = List(
+ Word('add_table', w_londiste_add_table),
+ Word('add_seq', w_londiste_add_seq),
+ Word('remove_table', w_londiste_remove_table),
+ Word('remove_seq', w_londiste_remove_seq),
+ Word('missing', w_done),
+ Word('tables', w_done),
+ Word('seqs', w_done),
+ name = "cmd2")
+
w_top = List(
Word('alter', w_alter),
Word('connect', w_connect),
@@ -385,6 +426,7 @@ w_top = List(
Word('unregister', w_cons_from_queue),
Word('show', w_show),
Word('exit', w_done),
+ Word('londiste', w_londiste),
name = "cmd")
top_level.set_real(w_top)
@@ -434,6 +476,52 @@ class AdminConsole:
q = "select distinct node_name from pgq_node.node_location order by 1"
return self._ccache('node_list', q, 'pgq_node')
+ def get_new_table_list(self):
+ if not self.cur_queue:
+ return []
+ qname = skytools.quote_literal(self.cur_queue)
+ q = """select n.nspname || '.' || r.relname
+ from pg_catalog.pg_class r
+ join pg_catalog.pg_namespace n on (n.oid = r.relnamespace)
+ left join londiste.table_info i on (i.queue_name = %s and i.table_name = (n.nspname || '.' || r.relname))
+ where r.relkind='r'
+ and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext')
+ and n.nspname !~ 'pg_.*'
+ and i.table_name is null
+ order by 1 """ % qname
+ return self._ccache('new_table_list', q, 'londiste')
+
+ def get_known_table_list(self):
+ if not self.cur_queue:
+ return []
+ qname = skytools.quote_literal(self.cur_queue)
+ q = "select table_name from londiste.table_info"\
+ " where queue_name = %s order by 1" % qname
+ return self._ccache('known_table_list', q, 'londiste')
+
+ def get_new_seq_list(self):
+ if not self.cur_queue:
+ return []
+ qname = skytools.quote_literal(self.cur_queue)
+ q = """select n.nspname || '.' || r.relname
+ from pg_catalog.pg_class r
+ join pg_catalog.pg_namespace n on (n.oid = r.relnamespace)
+ left join londiste.seq_info i on (i.queue_name = %s and i.seq_name = (n.nspname || '.' || r.relname))
+ where r.relkind='S'
+ and n.nspname not in ('pg_catalog', 'information_schema', 'pgq', 'londiste', 'pgq_node', 'pgq_ext')
+ and n.nspname !~ 'pg_.*'
+ and i.seq_name is null
+ order by 1 """ % qname
+ return self._ccache('new_seq_list', q, 'londiste')
+
+ def get_known_seq_list(self):
+ if not self.cur_queue:
+ return []
+ qname = skytools.quote_literal(self.cur_queue)
+ q = "select seq_name from londiste.seq_info"\
+ " where queue_name = %s order by 1" % qname
+ return self._ccache('known_seq_list', q, 'londiste')
+
def get_batch_list(self):
if not self.cur_queue:
return []
@@ -645,6 +733,7 @@ class AdminConsole:
def sql_words(self, sql):
return skytools.sql_tokenizer(sql,
standard_quoting = True,
+ fqident = True,
ignore_whitespace = True)
def reset_comp_cache(self):
@@ -687,11 +776,12 @@ class AdminConsole:
w = w.lower()
#print repr(typ), repr(w)
if typ == 'error':
- print 'syntax error:', repr(ln)
+ print 'syntax error 1:', repr(ln)
return
+ onode = node
node = node.get_next(typ, w, params)
if not node:
- print "syntax error:", repr(ln)
+ print "syntax error 2:", repr(ln), repr(typ), repr(w), repr(params)
return
if node == top_level:
self.exec_params(params)
@@ -717,6 +807,7 @@ class AdminConsole:
fn(params)
except Exception, ex:
print "ERROR: %s" % str(ex).strip()
+ raise
def bad_cmd(self, params):
print 'unimplemented command'
@@ -984,6 +1075,83 @@ class AdminConsole:
def cmd_exit(self, params):
sys.exit(0)
+ ##
+ ## Londiste
+ ##
+
+ def cmd_londiste_missing(self, params):
+ """Show missing objects."""
+
+ queue = self.cur_queue
+
+ curs = self.db.cursor()
+ q = """select * from londiste.local_show_missing(%s)"""
+ curs.execute(q, [queue])
+
+ display_result(curs, 'Missing objects queue "%s"' % (queue))
+
+ def cmd_londiste_tables(self, params):
+ """Show local tables."""
+
+ queue = self.cur_queue
+
+ curs = self.db.cursor()
+ q = """select * from londiste.get_table_list(%s) where local"""
+ curs.execute(q, [queue])
+
+ display_result(curs, 'Local tables on queue "%s"' % (queue))
+
+ def cmd_londiste_seqs(self, params):
+ """Show local seqs."""
+
+ queue = self.cur_queue
+
+ curs = self.db.cursor()
+ q = """select * from londiste.get_seq_list(%s) where local"""
+ curs.execute(q, [queue])
+
+ display_result(curs, 'Sequences on queue "%s"' % (queue))
+
+ def cmd_londiste_add_table(self, params):
+ """Add table."""
+
+ curs = self.db.cursor()
+ q = """select * from londiste.local_add_table(%s, %s)"""
+ curs.execute(q, [self.cur_queue, params['table']])
+
+ res = curs.fetchone()
+ print 'ADD_TABLE:', res[0], res[1]
+
+ def cmd_londiste_remove_table(self, params):
+ """Remove table."""
+
+ curs = self.db.cursor()
+ q = """select * from londiste.local_remove_table(%s, %s)"""
+ curs.execute(q, [self.cur_queue, params['table']])
+
+ res = curs.fetchone()
+ print 'REMOVE_TABLE:', res[0], res[1]
+
+ def cmd_londiste_add_seq(self, params):
+ """Add seq."""
+
+ curs = self.db.cursor()
+ q = """select * from londiste.local_add_seq(%s, %s)"""
+ curs.execute(q, [self.cur_queue, params['seq']])
+
+ res = curs.fetchone()
+ print 'ADD_SEQ:', res[0], res[1]
+
+ def cmd_londiste_remove_seq(self, params):
+ """Remove seq."""
+
+ curs = self.db.cursor()
+ q = """select * from londiste.local_remove_seq(%s, %s)"""
+ curs.execute(q, [self.cur_queue, params['seq']])
+
+ res = curs.fetchone()
+ print 'REMOVE_SEQ:', res[0], res[1]
+
def main():
global script
script = AdminConsole()