#! /usr/bin/env python import sys, optparse, skytools from pgq.setconsumer import MemberInfo, NodeInfo class SetInfo: def __init__(self, set_name, info_row, member_rows): self.root_info = info_row self.set_name = set_name self.member_map = {} self.root_name = info_row['node_name'] self.root_type = info_row['node_type'] self.global_watermark = info_row['global_watermark'] for r in member_rows: n = MemberInfo(r) self.member_map[n.name] = n def get_member(self, name): return self.member_map.get(name) command_usage = """ %prog [options] INI CMD [subcmd args] commands: """ class SetAdmin(skytools.DBScript): root_name = None root_info = None member_map = {} set_name = None extra_objs = [] initial_db_name = 'node_db' def init_optparse(self, parser = None): p = skytools.DBScript.init_optparse(self, parser) p.set_usage(command_usage.strip()) g = optparse.OptionGroup(p, "actual setadm options") g.add_option("--connstr", action="store_true", help = "initial connect string") g.add_option("--provider", help = "init: connect string for provider") p.add_option_group(g) return p def work(self): self.set_single_loop(1) self.set_name = self.cf.get('set_name') if self.is_cmd("init-root", 2): self.init_node("root", self.args[2], self.args[3]) elif self.is_cmd("init-branch", 2): self.init_node("branch", self.args[2], self.args[3]) elif self.is_cmd("init-leaf", 2): self.init_node("leaf", self.args[2], self.args[3]) else: self.log.info("need command") def is_cmd(self, name, argcnt): if len(self.args) < 2: return False if self.args[1] != name: return False if len(self.args) != argcnt + 2: self.log.error("cmd %s needs %d args" % (name, argcnt)) sys.exit(1) return True def init_node(self, node_type, node_name, node_location): provider_loc = self.options.provider # connect to database db = self.get_database("new_node", connstr = node_location) # check if code is installed self.install_code(db) # query current status res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) info = res[0] if info['node_type'] is not None: self.log.info("Node is already initialized as %s" % info['node_type']) return self.log.info("Initializing node") # fixme worker_name = "%s_%s_worker" % (self.set_name, node_name) # register member if node_type in ('root', 'combined-root'): global_watermark = None combined_set = None provider_name = None self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, node_name, node_location]) self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)", [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set]) provider_db = None else: root_db = self.find_root_db(provider_loc) set = self.load_root_info(root_db) # check if member already exists if set.get_member(node_name) is not None: self.log.error("Node '%s' already exists" % node_name) sys.exit(1) global_watermark = set.global_watermark combined_set = None provider_db = self.get_database('provider_db', connstr = provider_loc) curs = provider_db.cursor() curs.execute("select node_type, node_name from pgq_set.get_node_info(%s)", [self.set_name]) provider_db.commit() row = curs.fetchone() if not row: raise Exceotion("provider node not found") provider_name = row['node_name'] # register member on root self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, node_name, node_location]) root_db.commit() # lookup provider provider = set.get_member(provider_name) if not provider: self.log.error("Node %s does not exist" % provider_name) sys.exit(1) # register on provider self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, node_name, node_location]) self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)", [self.set_name, node_name, worker_name]) provider_db.commit() # initialize node itself self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, node_name, node_location]) self.exec_sql(db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, provider_name, provider.location]) self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)", [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set]) db.commit() self.extra_init(node_type, db, provider_db) self.log.info("Done") def extra_init(self, node_type, node_db, provider_db): pass def find_root_db(self, initial_loc): if initial_loc: loc = initial_loc else: loc = self.cf.get(self.initial_db_name) while 1: db = self.get_database('root_db', connstr = loc) # query current status res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) info = res[0] type = info['node_type'] if type is None: self.log.info("Root node not initialized?") sys.exit(1) # configured db may not be root anymore, walk upwards then if type in ('root', 'combined-root'): db.commit() return db self.close_connection() loc = info['provider_location'] if loc is None: self.log.info("Sub node provider not initialized?") sys.exit(1) def load_root_info(self, db): res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) info = res[0] q = "select * from pgq_set.get_member_info(%s)" node_list = self.exec_query(db, q, [self.set_name]) db.commit() return SetInfo(self.set_name, info, node_list) def exec_sql(self, db, q, args): self.log.debug(q) curs = db.cursor() curs.execute(q, args) db.commit() def exec_query(self, db, q, args): self.log.debug(q) curs = db.cursor() curs.execute(q, args) res = curs.dictfetchall() db.commit() return res def install_code(self, db): objs = [ skytools.DBLanguage("plpgsql"), skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"), skytools.DBSchema("pgq", sql_file="pgq.sql"), skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"), skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"), ] objs += self.extra_objs skytools.db_install(db.cursor(), objs, self.log) db.commit() if __name__ == '__main__': script = SetAdmin('set_admin', sys.argv[1:]) script.start()