summaryrefslogtreecommitdiff
path: root/python/pgq
diff options
context:
space:
mode:
authorMarko Kreen2013-03-21 21:16:07 +0000
committerMarko Kreen2013-03-22 08:50:23 +0000
commit1d535a88c28f9a8ff8b6977340a3e4c0b553f92c (patch)
treea8ec77fb2fcfc5833af08f88e88652ef7dbbf891 /python/pgq
parent5868d74d1ba77f9fae4bbbdd668b1e4e4f45f867 (diff)
londiste: connection profiles
Way to add extra connect string parameters for non-config connection strings (public node locations).
Diffstat (limited to 'python/pgq')
-rw-r--r--python/pgq/cascade/admin.py24
-rw-r--r--python/pgq/cascade/consumer.py4
-rw-r--r--python/pgq/cascade/worker.py2
3 files changed, 19 insertions, 11 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index 8b10584e..58b8e5d0 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -246,7 +246,7 @@ class CascadeAdmin(skytools.AdminScript):
combined_set = None
- provider_db = self.get_database('provider_db', connstr = provider_loc)
+ provider_db = self.get_database('provider_db', connstr = provider_loc, profile = 'remote')
q = "select node_type, node_name from pgq_node.get_node_info(%s)"
res = self.exec_query(provider_db, q, [self.queue_name])
row = res[0]
@@ -297,7 +297,7 @@ class CascadeAdmin(skytools.AdminScript):
def check_public_connstr(self, db, pub_connstr):
"""Look if public and local connect strings point to same db's.
"""
- pub_db = self.get_database("pub_db", connstr = pub_connstr)
+ pub_db = self.get_database("pub_db", connstr = pub_connstr, profile = 'remote')
curs1 = db.cursor()
curs2 = pub_db.cursor()
q = "select oid, datname, txid_current() as txid, txid_current_snapshot() as snap"\
@@ -339,12 +339,12 @@ class CascadeAdmin(skytools.AdminScript):
"""Find root node, having start point."""
if initial_loc:
loc = initial_loc
+ db = self.get_database('root_db', connstr = loc, profile = 'remote')
else:
loc = self.cf.get(self.initial_db_name)
-
- while 1:
db = self.get_database('root_db', connstr = loc)
+ while 1:
# query current status
res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name])
info = res[0]
@@ -367,6 +367,9 @@ class CascadeAdmin(skytools.AdminScript):
if loc is None:
self.log.error("Sub node provider not initialized?")
sys.exit(1)
+
+ db = self.get_database('root_db', connstr = loc, profile = 'remote')
+
raise Exception('process canceled')
def find_root_node(self):
@@ -430,6 +433,8 @@ class CascadeAdmin(skytools.AdminScript):
"""Show set status."""
self.load_local_info()
+ cstr_extra = self.cf.get('remote_extra_connstr', '')
+
# prepare structs for workers
members = Queue.Queue()
for m in self.queue_info.member_map.itervalues():
@@ -441,7 +446,7 @@ class CascadeAdmin(skytools.AdminScript):
num_threads = max (min (num_nodes / 4, 100), 1)
tlist = []
for i in range(num_threads):
- t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes))
+ t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes, cstr_extra))
t.daemon = True
t.start()
tlist.append(t)
@@ -458,14 +463,17 @@ class CascadeAdmin(skytools.AdminScript):
self.queue_info.print_tree()
- def _cmd_status_worker (self, members, nodes):
+ def _cmd_status_worker (self, members, nodes, cstr_extra):
# members in, nodes out, both thread-safe
while True:
try:
m = members.get_nowait()
except Queue.Empty:
break
- node = self.load_node_status (m.name, m.location)
+ loc = m.location
+ if cstr_extra:
+ loc = loc + ' ' + cstr_extra
+ node = self.load_node_status (m.name, loc)
nodes.put(node)
members.task_done()
@@ -1362,7 +1370,7 @@ class CascadeAdmin(skytools.AdminScript):
if m.dead:
return None
loc = m.location
- db = self.get_database('node.' + node_name, connstr = loc)
+ db = self.get_database('node.' + node_name, connstr = loc, profile = 'remote')
return db
def node_alive(self, node_name):
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index cc1bd7a9..68e949ef 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -64,7 +64,7 @@ class CascadedConsumer(BaseConsumer):
dst_db = self.get_database(self.target_db)
dst_curs = dst_db.cursor()
- src_db = self.get_database(PDB, connstr = provider_loc)
+ src_db = self.get_database(PDB, connstr = provider_loc, profile = 'remote')
src_curs = src_db.cursor()
# check target info
@@ -99,7 +99,7 @@ class CascadedConsumer(BaseConsumer):
def get_provider_db(self, state):
provider_loc = state['provider_location']
- return self.get_database(PDB, connstr = provider_loc)
+ return self.get_database(PDB, connstr = provider_loc, profile = 'remote')
def unregister_consumer(self):
dst_db = self.get_database(self.target_db)
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index dbe6ba3c..d6b1fa96 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -263,7 +263,7 @@ class CascadedWorker(CascadedConsumer):
if n['dead']:
# ignore dead nodes
continue
- wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1)
+ wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1, profile = 'remote')
wmcurs = wmdb.cursor()
q = 'select local_watermark from pgq_node.get_node_info(%s)'
wmcurs.execute(q, [self.queue_name])