summaryrefslogtreecommitdiff
path: root/python/pgq/setadmin.py
diff options
context:
space:
mode:
authorMarko Kreen2008-04-15 13:02:47 +0000
committerMarko Kreen2008-04-15 13:02:47 +0000
commit329cae98ec32cd9369cab54a3b597d50fd280562 (patch)
tree123d6cb60abbc445926956e77d2dc0318483e36e /python/pgq/setadmin.py
parent3bb252361d1595e2b882c3fae9602d866a8c553b (diff)
more setconsumer/londiste work.
simple init/event processing/copy seems to work.
Diffstat (limited to 'python/pgq/setadmin.py')
-rw-r--r--python/pgq/setadmin.py32
1 files changed, 25 insertions, 7 deletions
diff --git a/python/pgq/setadmin.py b/python/pgq/setadmin.py
index a389f389..d1b10e33 100644
--- a/python/pgq/setadmin.py
+++ b/python/pgq/setadmin.py
@@ -40,9 +40,9 @@ class SetAdmin(skytools.DBScript):
g = optparse.OptionGroup(p, "actual setadm options")
g.add_option("--connstr", action="store_true",
- help = "add: ignore table differences, repair: ignore lag")
+ help = "initial connect string")
g.add_option("--provider",
- help = "add: ignore table differences, repair: ignore lag")
+ help = "init: connect string for provider")
p.add_option_group(g)
return p
@@ -71,6 +71,8 @@ class SetAdmin(skytools.DBScript):
return True
def init_node(self, node_type, node_name, node_location):
+ provider_loc = self.options.provider
+
# connect to database
db = self.get_database("new_node", connstr = node_location)
@@ -98,8 +100,9 @@ class SetAdmin(skytools.DBScript):
[self.set_name, node_name, node_location])
self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)",
[self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set])
+ provider_db = None
else:
- root_db = self.find_root_db()
+ root_db = self.find_root_db(provider_loc)
set = self.load_root_info(root_db)
# check if member already exists
@@ -109,7 +112,15 @@ class SetAdmin(skytools.DBScript):
global_watermark = set.global_watermark
combined_set = None
- provider_name = self.options.provider
+
+ provider_db = self.get_database('provider_db', connstr = provider_loc)
+ curs = provider_db.cursor()
+ curs.execute("select node_type, node_name from pgq_set.get_node_info(%s)", [self.set_name])
+ provider_db.commit()
+ row = curs.fetchone()
+ if not row:
+ raise Exceotion("provider node not found")
+ provider_name = row['node_name']
# register member on root
self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)",
@@ -123,7 +134,6 @@ class SetAdmin(skytools.DBScript):
sys.exit(1)
# register on provider
- provider_db = self.get_database('provider_db', connstr = provider.location)
self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)",
[self.set_name, node_name, node_location])
self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)",
@@ -140,10 +150,18 @@ class SetAdmin(skytools.DBScript):
global_watermark, combined_set])
db.commit()
+ self.extra_init(node_type, db, provider_db)
+
self.log.info("Done")
- def find_root_db(self):
- loc = self.cf.get(self.initial_db_name)
+ def extra_init(self, node_type, node_db, provider_db):
+ pass
+
+ def find_root_db(self, initial_loc):
+ if initial_loc:
+ loc = initial_loc
+ else:
+ loc = self.cf.get(self.initial_db_name)
while 1:
db = self.get_database('root_db', connstr = loc)