summaryrefslogtreecommitdiff
path: root/python/londiste/setup.py
diff options
context:
space:
mode:
authorMarko Kreen2008-04-22 12:46:13 +0000
committerMarko Kreen2008-04-22 12:46:13 +0000
commitb39310d82f8203ce02280043744a500ca68b6e7a (patch)
tree8d85313017dc8f7d762e1b6dd62da4b5c9357b61 /python/londiste/setup.py
parent0f571a13fed0fe8adffba9ee545b9add46075f34 (diff)
more pgq_set/londiste cleanup
Diffstat (limited to 'python/londiste/setup.py')
-rw-r--r--python/londiste/setup.py149
1 files changed, 36 insertions, 113 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 15991311..7ab6093d 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -5,86 +5,19 @@
import sys, os, skytools
+import pgq.setadmin
+
__all__ = ['LondisteSetup']
-class LondisteSetup(skytools.DBScript):
+class LondisteSetup(pgq.setadmin.SetAdmin):
+ initial_db_name = 'node_db'
+ extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ]
def __init__(self, args):
- skytools.DBScript.__init__(self, 'londiste', args)
- self.set_single_loop(1)
- self.pidfile = self.pidfile + ".setup"
-
+ pgq.setadmin.SetAdmin.__init__(self, 'londiste', args)
self.set_name = self.cf.get("set_name")
- self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
-
- if len(self.args) < 2:
- self.log.error("need command")
- sys.exit(1)
-
- def run(self):
- cmd = self.args[1]
- fname = "cmd_" + cmd.replace('-', '_')
- if hasattr(self, fname):
- getattr(self, fname)(self.args[2:])
- else:
- self.log.error('bad subcommand')
- sys.exit(1)
-
- def fetch_list(self, curs, sql, args, keycol = None):
- curs.execute(sql, args)
- rows = curs.dictfetchall()
- if not keycol:
- res = rows
- else:
- res = [r[keycol] for r in rows]
- return res
-
- def db_fetch_list(self, sql, args, keycol = None):
- db = self.get_database('node_db')
- curs = db.cursor()
- res = self.fetch_list(curs, sql, keycol)
- db.commit()
- return res
-
- def display_table(self, desc, curs, sql, args = [], fields = []):
- """Display multirow query as a table."""
-
- curs.execute(sql, args)
- rows = curs.fetchall()
- if len(rows) == 0:
- return 0
-
- if not fields:
- fields = [f[0] for f in curs.description]
-
- widths = [15] * len(fields)
- for row in rows:
- for i, k in enumerate(fields):
- rlen = row[k] and len(row) or 0
- widths[i] = widths[i] > rlen and widths[i] or rlen
- widths = [w + 2 for w in widths]
-
- fmt = '%%-%ds' * (len(widths) - 1) + '%%s'
- fmt = fmt % tuple(widths[:-1])
- if desc:
- print desc
- print fmt % tuple(fields)
- print fmt % tuple(['-'*15] * len(fields))
-
- for row in rows:
- print fmt % tuple([row[k] for k in fields])
- print '\n'
- return 1
-
- def db_display_table(self, desc, sql, args = [], fields = []):
- db = self.get_database('node_db')
- curs = db.cursor()
- res = self.display_table(desc, curs, sql, args, fields)
- db.commit()
- return res
-
def init_optparse(self, parser=None):
- p = skytools.DBScript.init_optparse(self, parser)
+ p = pgq.setadmin.SetAdmin.init_optparse(self, parser)
p.add_option("--expect-sync", action="store_true", dest="expect_sync",
help = "no copy needed", default=False)
p.add_option("--skip-truncate", action="store_true", dest="skip_truncate",
@@ -93,71 +26,61 @@ class LondisteSetup(skytools.DBScript):
help="force", default=False)
p.add_option("--all", action="store_true",
help="include all tables", default=False)
- p.add_option("--provider",
- help="init: upstream node temp connect string", default=None)
return p
- def exec_checked(self, curs, sql, args):
- curs.execute(sql, args)
- ok = True
- for row in curs.fetchall():
- if (row[0] / 100) == 2:
- self.log.info("%d %s" % (row[0], row[1]))
- else:
- self.log.error("%d %s" % (row[0], row[1]))
- ok = False
- return ok
-
- def exec_many(self, curs, sql, baseargs, extra_list):
- res = True
- for a in extra_list:
- ok = self.exec_checked(curs, sql, baseargs + [a])
- if not ok:
- res = False
- return res
-
- def db_exec_many(self, sql, baseargs, extra_list):
- db = self.get_database('node_db')
- curs = db.cursor()
- ok = self.exec_many(curs, sql, baseargs, extra_list)
- if ok:
- self.log.info("COMMIT")
- db.commit()
- else:
- self.log.info("ROLLBACK")
- db.rollback()
+ def extra_init(self, node_type, node_db, provider_db):
+ if not provider_db:
+ return
+ pcurs = provider_db.cursor()
+ ncurs = node_db.cursor()
+ q = "select table_name from londiste.set_get_table_list(%s)"
+ pcurs.execute(q, [self.set_name])
+ for row in pcurs.fetchall():
+ tbl = row['table_name']
+ q = "select * from londiste.set_add_table(%s, %s)"
+ ncurs.execute(q, [self.set_name, tbl])
+ node_db.commit()
+ provider_db.commit()
def cmd_add(self, args = []):
q = "select * from londiste.node_add_table(%s, %s)"
- self.db_exec_many(q, [self.set_name], args)
+ db = self.get_database('node_db')
+ self.db_cmd_many(db, q, [self.set_name], args)
def cmd_remove(self, args = []):
q = "select * from londiste.node_remove_table(%s, %s)"
- self.db_exec_many(q, [self.set_name], args)
+ db = self.get_database('node_db')
+ self.db_cmd_many(db, q, [self.set_name], args)
def cmd_add_seq(self, args = []):
q = "select * from londiste.node_add_seq(%s, %s)"
- self.db_exec_many(q, [self.set_name], args)
+ db = self.get_database('node_db')
+ self.db_cmd_many(db, q, [self.set_name], args)
def cmd_remove_seq(self, args = []):
q = "select * from londiste.node_remove_seq(%s, %s)"
- self.db_exec_many(q, [self.set_name], args)
+ db = self.get_database('node_db')
+ self.db_cmd_many(db, q, [self.set_name], args)
def cmd_resync(self, args = []):
q = "select * from londiste.node_resync_table(%s, %s)"
- self.db_exec_many(q, [self.set_name], args)
+ db = self.get_database('node_db')
+ self.db_cmd_many(db, q, [self.set_name], args)
def cmd_tables(self, args = []):
q = "select table_name, merge_state from londiste.node_get_table_list(%s)"
- self.db_display_table("Tables on node", q, [self.set_name])
+ db = self.get_database('node_db')
+ self.db_display_table(db, "Tables on node", q, [self.set_name])
def cmd_seqs(self, args = []):
q = "select seq_namefrom londiste.node_get_seq_list(%s)"
- self.db_display_table("Sequences on node", q, [self.set_name])
+ db = self.get_database('node_db')
+ self.db_display_table(db, "Sequences on node", q, [self.set_name])
def cmd_missing(self, args = []):
q = "select * from londiste.node_show_missing(%s)"
- self.db_display_table("MIssing objects on node", q, [self.set_name])
+ db = self.get_database('node_db')
+ self.db_display_table(db, "Missing objects on node", q, [self.set_name])
def cmd_check(self, args = []):
pass