diff options
author | Marko Kreen | 2009-02-13 12:16:59 +0000 |
---|---|---|
committer | Marko Kreen | 2009-02-13 13:20:31 +0000 |
commit | 411f237ebfdce9532bbf01f51d46cdb91f7fe283 (patch) | |
tree | 851a378f1234629cf2f15d903ecea4ae88df126f /python/pgq/setadmin.py | |
parent | c8198c637b777296ec49b845395487242d730394 (diff) |
python/pgq update
- remove the obsolete setconsumer stuff
- New CascadedConsumer / CascadedWorker classes,
that are based on regular pgq.Consumer
- move RemoteConsumer / SerialConsumer out of pgq/consumer.py
pgq.Consumer:
- rename conf params and instance variables:
pgq_queue_name -> queue_name
pgq_consumer_id -> consumer_name
- disable automatic registration on the queue,
now script needs to be called with switches --register / --unregister
- drop code to read from multiple-queues at once
pgq.ticker:
- drop event seq tracking code, this is now done in db
Diffstat (limited to 'python/pgq/setadmin.py')
-rw-r--r-- | python/pgq/setadmin.py | 463 |
1 files changed, 0 insertions, 463 deletions
diff --git a/python/pgq/setadmin.py b/python/pgq/setadmin.py deleted file mode 100644 index 0c500468..00000000 --- a/python/pgq/setadmin.py +++ /dev/null @@ -1,463 +0,0 @@ -#! /usr/bin/env python - -import sys, time, optparse, skytools - -from pgq.setinfo import * - -__all__ = ['SetAdmin'] - -command_usage = """\ -%prog [options] INI CMD [subcmd args] - -Node Initialization: - init-root NODE_NAME NODE_CONSTR - init-branch NODE_NAME NODE_CONSTR --provider=<constr> - init-leaf NODE_NAME NODE_CONSTR --provider=<constr> - Initializes node. Given connstr is kept as global connstring - for that node. Those commands ignore node_db in .ini. - The --provider connstr is used only for initial set info - fetching, later actual provider's connect string is used. - -Node Administration: - status Show set state - members Show members in set - rename-node OLD NEW Rename a node - change-provider NODE NEWSRC - pause NODE - resume NODE - - switchover NEWROOT - failover NEWROOT - tag-dead NODE .. Tag node as dead - tag-alive NODE .. Tag node as alive -""" - -class SetAdmin(skytools.AdminScript): - set_name = None - extra_objs = [] - initial_db_name = 'node_db' - - def init_optparse(self, parser = None): - p = skytools.AdminScript.init_optparse(self, parser) - p.set_usage(command_usage.strip()) - - g = optparse.OptionGroup(p, "actual setadm options") - g.add_option("--connstr", action="store_true", - help = "initial connect string") - g.add_option("--provider", - help = "init: connect string for provider") - p.add_option_group(g) - return p - - def reload(self): - skytools.AdminScript.reload(self) - self.set_name = self.cf.get('set_name') - - # - # Node initialization. - # - - def cmd_init_root(self, node_name, node_location): - self.init_node('root', node_name, node_location) - - def cmd_init_branch(self, node_name, node_location): - if len(args) != 2: - raise Exception('init-branch needs 2 args') - self.init_node('branch', node_name, node_location) - - def cmd_init_leaf(self, node_name, node_location): - self.init_node('leaf', node_name, node_location) - - def init_node(self, node_type, node_name, node_location): - provider_loc = self.options.provider - - # connect to database - db = self.get_database("new_node", connstr = node_location) - - # check if code is installed - self.install_code(db) - - # query current status - res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) - info = res[0] - if info['node_type'] is not None: - self.log.info("Node is already initialized as %s" % info['node_type']) - return - - self.log.info("Initializing node") - - # register member - if node_type in ('root', 'combined-root'): - global_watermark = None - combined_set = None - provider_name = None - self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)", - [self.set_name, node_name, node_location]) - self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)", - [self.set_name, node_type, node_name, provider_name, global_watermark, combined_set]) - provider_db = None - else: - root_db = self.find_root_db(provider_loc) - set = self.load_set_info(root_db) - - # check if member already exists - if set.get_member(node_name) is not None: - self.log.error("Node '%s' already exists" % node_name) - sys.exit(1) - - combined_set = None - - provider_db = self.get_database('provider_db', connstr = provider_loc) - q = "select node_type, node_name from pgq_set.get_node_info(%s)" - res = self.exec_query(provider_db, q, [self.set_name]) - row = res[0] - if not row['node_name']: - raise Exception("provider node not found") - provider_name = row['node_name'] - - # register member on root - self.exec_cmd(root_db, "select * from pgq_set.add_member(%s, %s, %s, false)", - [self.set_name, node_name, node_location]) - - # lookup provider - provider = set.get_member(provider_name) - if not provider: - self.log.error("Node %s does not exist" % provider_name) - sys.exit(1) - - # register on provider - self.exec_cmd(provider_db, "select * from pgq_set.add_member(%s, %s, %s, false)", - [self.set_name, node_name, node_location]) - rows = self.exec_cmd(provider_db, "select * from pgq_set.subscribe_node(%s, %s)", - [self.set_name, node_name]) - global_watermark = rows[0]['global_watermark'] - - # initialize node itself - - # insert members - self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, false)", - [self.set_name, node_name, node_location]) - for m in set.member_map.values(): - self.exec_cmd(db, "select * from pgq_set.add_member(%s, %s, %s, %s)", - [self.set_name, m.name, m.location, m.dead]) - - # real init - self.exec_cmd(db, "select * from pgq_set.create_node(%s, %s, %s, %s, %s, %s)", - [self.set_name, node_type, node_name, provider_name, - global_watermark, combined_set]) - - - self.extra_init(node_type, db, provider_db) - - self.log.info("Done") - - def extra_init(self, node_type, node_db, provider_db): - pass - - def find_root_db(self, initial_loc = None): - if initial_loc: - loc = initial_loc - else: - loc = self.cf.get(self.initial_db_name) - - while 1: - db = self.get_database('root_db', connstr = loc) - - - # query current status - res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) - info = res[0] - type = info['node_type'] - if type is None: - self.log.info("Root node not initialized?") - sys.exit(1) - - self.log.debug("db='%s' -- type='%s' provider='%s'" % (loc, type, info['provider_location'])) - # configured db may not be root anymore, walk upwards then - if type in ('root', 'combined-root'): - db.commit() - return db - - self.close_database('root_db') - if loc == info['provider_location']: - raise Exception("find_root_db: got loop: %s" % loc) - loc = info['provider_location'] - if loc is None: - self.log.error("Sub node provider not initialized?") - sys.exit(1) - - def install_code(self, db): - objs = [ - skytools.DBLanguage("plpgsql"), - skytools.DBFunction("txid_current_snapshot", 0, sql_file="txid.sql"), - skytools.DBSchema("pgq", sql_file="pgq.sql"), - skytools.DBSchema("pgq_ext", sql_file="pgq_ext.sql"), - skytools.DBSchema("pgq_set", sql_file="pgq_set.sql"), - ] - objs += self.extra_objs - skytools.db_install(db.cursor(), objs, self.log) - db.commit() - - # - # Print status of whole set. - # - - def cmd_status(self): - root_db = self.find_root_db() - sinf = self.load_set_info(root_db) - - for mname, minf in sinf.member_map.iteritems(): - db = self.get_database('look_db', connstr = minf.location, autocommit = 1) - curs = db.cursor() - curs.execute("select * from pgq_set.get_node_info(%s)", [self.set_name]) - node = NodeInfo(self.set_name, curs.fetchone()) - node.load_status(curs) - self.load_extra_status(curs, node) - sinf.add_node(node) - self.close_database('look_db') - - sinf.print_tree() - - def load_extra_status(self, curs, node): - pass - - # - # Normal commands. - # - - def cmd_change_provider(self, node_name, new_provider): - old_provider = None - - self.load_local_info() - node_location = self.set_info.get_member(node_name).location - node_db = self.get_node_database(node_name) - node_set_info = self.load_set_info(node_db) - node = node_set_info.local_node - old_provider = node.provider_node - - if old_provider == new_provider: - self.log.info("Node %s has already %s as provider" % (node_name, new_provider)) - - # pause target node - self.pause_node(node_name) - - # reload node info - node_set_info = self.load_set_info(node_db) - node = node_set_info.local_node - - # subscribe on new provider - q = "select * from pgq_set.add_member(%s, %s, %s, false)" - self.node_cmd(new_provider, q, [self.set_name, node_name, node_location]) - q = 'select * from pgq_set.subscribe_node(%s, %s, %s)' - self.node_cmd(new_provider, q, [self.set_name, node_name, node.completed_tick]) - - # change provider on node - q = 'select * from pgq_set.change_provider(%s, %s)' - self.node_cmd(node_name, q, [self.set_name, new_provider]) - - # unsubscribe from old provider - q = "select * from pgq_set.unsubscribe_node(%s, %s)" - self.node_cmd(old_provider, q, [self.set_name, node_name]) - - # resume node - self.resume_node(node_name) - - def cmd_rename_node(self, old_name, new_name): - - self.load_local_info() - - root_db = self.find_root_db() - - # pause target node - self.pause_node(old_name) - node = self.load_node_info(old_name) - provider_node = node.provider_node - subscriber_list = self.get_node_subscriber_list(old_name) - - - # create copy of member info / subscriber+queue info - step1 = 'select * from pgq_set.rename_node_step1(%s, %s, %s)' - # rename node itself, drop copies - step2 = 'select * from pgq_set.rename_node_step2(%s, %s, %s)' - - # step1 - self.exec_cmd(root_db, step1, [self.set_name, old_name, new_name]) - self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name]) - self.node_cmd(old_name, step1, [self.set_name, old_name, new_name]) - for child in subscriber_list: - self.node_cmd(child, step1, [self.set_name, old_name, new_name]) - - # step1 - self.node_cmd(old_name, step2, [self.set_name, old_name, new_name]) - self.node_cmd(provider_node, step1, [self.set_name, old_name, new_name]) - for child in subscriber_list: - self.node_cmd(child, step2, [self.set_name, old_name, new_name]) - self.exec_cmd(root_db, step2, [self.set_name, old_name, new_name]) - - # resume node - self.resume_node(old_name) - - def switchover_nonroot(self, old_node, new_node): - # see if we need to change new nodes' provider - tmp_node = new_node - while 1: - if tmp_node.is_root(): - break - if tmp_node.name == old_node: - # yes, old_node is new_nodes provider, - # switch it around - self.change_provider(new_node, old_node.parent_node) - break - self.change_provider(old_node.name, new_node.name) - - def switchover_root(self, old_node, new_node): - self.pause_node(old_node.name) - self.extra_lockdown(old_node) - - self.wait_for_catchup(new_node, old_node) - self.pause_node(new_node.name) - self.promote_node(new_node.name) - self.subscribe_node(new_node.name, old_node.name, tick_pos) - self.unsubscribe_node(new_node.parent_node, new_node.name) - self.resume_node(new_node.name) - - # demote & set provider on node - q = 'select * from pgq_set.demote_root(%s, %s)' - self.node_cmd(old_node.name, q, [self.set_name, new_node.name]) - - self.resume_node(old_node.name) - - def cmd_switchover(self, old_node_name, new_node_name): - self.load_local_info() - old_node = self.get_node_info(old_node_name) - new_node = self.get_node_info(new_node_name) - if old_node.name == new_node.name: - self.log.info("same node?") - return - - if old_node.is_root(): - self.switchover_root(old_node, new_node) - else: - self.switchover_nonroot(old_node, new_node) - - # switch subscribers around - if self.options.all: - for n in self.get_node_subscriber_list(old_node.name): - self.change_provider(n, new_node.name) - - def cmd_pause(self, node_name): - self.load_local_info() - self.pause_node(node_name) - - def cmd_resume(self, node_name): - self.load_local_info() - self.resume_node(node_name) - - def cmd_members(self): - db = self.get_database(self.initial_db_name) - q = "select node_name from pgq_set.get_node_info(%s)" - rows = self.exec_query(db, q, [self.set_name]) - - desc = 'Member info on %s:' % rows[0]['node_name'] - q = "select node_name, dead, node_location"\ - " from pgq_set.get_member_info(%s) order by 1" - self.display_table(db, desc, q, [self.set_name]) - - # - # Shortcuts for operating on nodes. - # - - def load_local_info(self): - """fetch set info from local node.""" - db = self.get_database(self.initial_db_name) - self.set_info = self.load_set_info(db) - - def get_node_database(self, node_name): - """Connect to node.""" - if node_name == self.set_info.local_node.name: - db = self.get_database(self.initial_db_name) - else: - m = self.set_info.get_member(node_name) - if not m: - self.log.error("cannot resolve %s" % node_name) - sys.exit(1) - loc = m.location - db = self.get_database('node.' + node_name, connstr = loc) - return db - - def close_node_database(self, node_name): - """Disconnect node's connection.""" - if node_name == self.set_info.local_node.name: - self.close_database(self.initial_db_name) - else: - self.close_database("node." + node_name) - - def node_cmd(self, node_name, sql, args): - """Execute SQL command on particular node.""" - db = self.get_node_database(node_name) - return self.exec_cmd(db, sql, args) - - # - # Various operation on nodes. - # - - def set_paused(self, db, pause_flag): - q = "select * from pgq_set.set_node_paused(%s, %s)" - self.exec_cmd(db, q, [self.set_name, pause_flag]) - - self.log.info('Waiting for worker to accept') - while 1: - q = "select * from pgq_set.get_node_info(%s)" - stat = self.exec_query(db, q, [self.set_name])[0] - if stat['paused'] != pause_flag: - raise Exception('operation canceled? %s <> %s' % (repr(stat['paused']), repr(pause_flag))) - - if stat['uptodate']: - break - time.sleep(1) - - op = pause_flag and "paused" or "resumed" - - self.log.info("Node %s %s" % (stat['node_name'], op)) - - def pause_node(self, node_name): - db = self.get_node_database(node_name) - self.set_paused(db, True) - - def resume_node(self, node_name): - db = self.get_node_database(node_name) - self.set_paused(db, False) - - def subscribe_node(self, target_node, subscriber_node, tick_pos): - q = "select * from pgq_set.subscribe_node(%s, %s, %s)" - self.node_cmd(target_node, q, [self.set_name, subscribe_node, tick_pos]) - - def unsubscribe_node(self, target_node, subscriber_node): - q = "select * from pgq_set.unsubscribe_node(%s, %s)" - self.node_cmd(target_node, q, [self.set_name, subscribe_node]) - - def load_node_info(self, node_name): - db = self.get_node_database(node_name) - q = "select * from pgq_set.get_node_info(%s)" - rows = self.exec_query(db, q, [self.set_name]) - return NodeInfo(self.set_name, rows[0]) - - def load_set_info(self, db): - res = self.exec_query(db, "select * from pgq_set.get_node_info(%s)", [self.set_name]) - info = res[0] - - q = "select * from pgq_set.get_member_info(%s)" - member_list = self.exec_query(db, q, [self.set_name]) - - return SetInfo(self.set_name, info, member_list) - - def get_node_subscriber_list(self, node_name): - q = "select node_name, local_watermark from pgq_set.get_subscriber_info(%s)" - db = self.get_node_database(node_name) - rows = self.exec_query(db, q, [self.set_name]) - return [r['node_name'] for r in rows] - -if __name__ == '__main__': - script = SetAdmin('set_admin', sys.argv[1:]) - script.start() - |