diff options
| author | Marko Kreen | 2008-04-22 12:46:13 +0000 |
|---|---|---|
| committer | Marko Kreen | 2008-04-22 12:46:13 +0000 |
| commit | b39310d82f8203ce02280043744a500ca68b6e7a (patch) | |
| tree | 8d85313017dc8f7d762e1b6dd62da4b5c9357b61 /python/londiste/setup.py | |
| parent | 0f571a13fed0fe8adffba9ee545b9add46075f34 (diff) | |
more pgq_set/londiste cleanup
Diffstat (limited to 'python/londiste/setup.py')
| -rw-r--r-- | python/londiste/setup.py | 149 |
1 files changed, 36 insertions, 113 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 15991311..7ab6093d 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -5,86 +5,19 @@ import sys, os, skytools +import pgq.setadmin + __all__ = ['LondisteSetup'] -class LondisteSetup(skytools.DBScript): +class LondisteSetup(pgq.setadmin.SetAdmin): + initial_db_name = 'node_db' + extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ] def __init__(self, args): - skytools.DBScript.__init__(self, 'londiste', args) - self.set_single_loop(1) - self.pidfile = self.pidfile + ".setup" - + pgq.setadmin.SetAdmin.__init__(self, 'londiste', args) self.set_name = self.cf.get("set_name") - self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name) - - if len(self.args) < 2: - self.log.error("need command") - sys.exit(1) - - def run(self): - cmd = self.args[1] - fname = "cmd_" + cmd.replace('-', '_') - if hasattr(self, fname): - getattr(self, fname)(self.args[2:]) - else: - self.log.error('bad subcommand') - sys.exit(1) - - def fetch_list(self, curs, sql, args, keycol = None): - curs.execute(sql, args) - rows = curs.dictfetchall() - if not keycol: - res = rows - else: - res = [r[keycol] for r in rows] - return res - - def db_fetch_list(self, sql, args, keycol = None): - db = self.get_database('node_db') - curs = db.cursor() - res = self.fetch_list(curs, sql, keycol) - db.commit() - return res - - def display_table(self, desc, curs, sql, args = [], fields = []): - """Display multirow query as a table.""" - - curs.execute(sql, args) - rows = curs.fetchall() - if len(rows) == 0: - return 0 - - if not fields: - fields = [f[0] for f in curs.description] - - widths = [15] * len(fields) - for row in rows: - for i, k in enumerate(fields): - rlen = row[k] and len(row) or 0 - widths[i] = widths[i] > rlen and widths[i] or rlen - widths = [w + 2 for w in widths] - - fmt = '%%-%ds' * (len(widths) - 1) + '%%s' - fmt = fmt % tuple(widths[:-1]) - if desc: - print desc - print fmt % tuple(fields) - print fmt % tuple(['-'*15] * len(fields)) - - for row in rows: - print fmt % tuple([row[k] for k in fields]) - print '\n' - return 1 - - def db_display_table(self, desc, sql, args = [], fields = []): - db = self.get_database('node_db') - curs = db.cursor() - res = self.display_table(desc, curs, sql, args, fields) - db.commit() - return res - def init_optparse(self, parser=None): - p = skytools.DBScript.init_optparse(self, parser) + p = pgq.setadmin.SetAdmin.init_optparse(self, parser) p.add_option("--expect-sync", action="store_true", dest="expect_sync", help = "no copy needed", default=False) p.add_option("--skip-truncate", action="store_true", dest="skip_truncate", @@ -93,71 +26,61 @@ class LondisteSetup(skytools.DBScript): help="force", default=False) p.add_option("--all", action="store_true", help="include all tables", default=False) - p.add_option("--provider", - help="init: upstream node temp connect string", default=None) return p - def exec_checked(self, curs, sql, args): - curs.execute(sql, args) - ok = True - for row in curs.fetchall(): - if (row[0] / 100) == 2: - self.log.info("%d %s" % (row[0], row[1])) - else: - self.log.error("%d %s" % (row[0], row[1])) - ok = False - return ok - - def exec_many(self, curs, sql, baseargs, extra_list): - res = True - for a in extra_list: - ok = self.exec_checked(curs, sql, baseargs + [a]) - if not ok: - res = False - return res - - def db_exec_many(self, sql, baseargs, extra_list): - db = self.get_database('node_db') - curs = db.cursor() - ok = self.exec_many(curs, sql, baseargs, extra_list) - if ok: - self.log.info("COMMIT") - db.commit() - else: - self.log.info("ROLLBACK") - db.rollback() + def extra_init(self, node_type, node_db, provider_db): + if not provider_db: + return + pcurs = provider_db.cursor() + ncurs = node_db.cursor() + q = "select table_name from londiste.set_get_table_list(%s)" + pcurs.execute(q, [self.set_name]) + for row in pcurs.fetchall(): + tbl = row['table_name'] + q = "select * from londiste.set_add_table(%s, %s)" + ncurs.execute(q, [self.set_name, tbl]) + node_db.commit() + provider_db.commit() def cmd_add(self, args = []): q = "select * from londiste.node_add_table(%s, %s)" - self.db_exec_many(q, [self.set_name], args) + db = self.get_database('node_db') + self.db_cmd_many(db, q, [self.set_name], args) def cmd_remove(self, args = []): q = "select * from londiste.node_remove_table(%s, %s)" - self.db_exec_many(q, [self.set_name], args) + db = self.get_database('node_db') + self.db_cmd_many(db, q, [self.set_name], args) def cmd_add_seq(self, args = []): q = "select * from londiste.node_add_seq(%s, %s)" - self.db_exec_many(q, [self.set_name], args) + db = self.get_database('node_db') + self.db_cmd_many(db, q, [self.set_name], args) def cmd_remove_seq(self, args = []): q = "select * from londiste.node_remove_seq(%s, %s)" - self.db_exec_many(q, [self.set_name], args) + db = self.get_database('node_db') + self.db_cmd_many(db, q, [self.set_name], args) def cmd_resync(self, args = []): q = "select * from londiste.node_resync_table(%s, %s)" - self.db_exec_many(q, [self.set_name], args) + db = self.get_database('node_db') + self.db_cmd_many(db, q, [self.set_name], args) def cmd_tables(self, args = []): q = "select table_name, merge_state from londiste.node_get_table_list(%s)" - self.db_display_table("Tables on node", q, [self.set_name]) + db = self.get_database('node_db') + self.db_display_table(db, "Tables on node", q, [self.set_name]) def cmd_seqs(self, args = []): q = "select seq_namefrom londiste.node_get_seq_list(%s)" - self.db_display_table("Sequences on node", q, [self.set_name]) + db = self.get_database('node_db') + self.db_display_table(db, "Sequences on node", q, [self.set_name]) def cmd_missing(self, args = []): q = "select * from londiste.node_show_missing(%s)" - self.db_display_table("MIssing objects on node", q, [self.set_name]) + db = self.get_database('node_db') + self.db_display_table(db, "Missing objects on node", q, [self.set_name]) def cmd_check(self, args = []): pass |
