diff options
| author | Marko Kreen | 2013-03-21 21:16:07 +0000 |
|---|---|---|
| committer | Marko Kreen | 2013-03-22 08:50:23 +0000 |
| commit | 1d535a88c28f9a8ff8b6977340a3e4c0b553f92c (patch) | |
| tree | a8ec77fb2fcfc5833af08f88e88652ef7dbbf891 /python/pgq | |
| parent | 5868d74d1ba77f9fae4bbbdd668b1e4e4f45f867 (diff) | |
londiste: connection profiles
Way to add extra connect string parameters for non-config
connection strings (public node locations).
Diffstat (limited to 'python/pgq')
| -rw-r--r-- | python/pgq/cascade/admin.py | 24 | ||||
| -rw-r--r-- | python/pgq/cascade/consumer.py | 4 | ||||
| -rw-r--r-- | python/pgq/cascade/worker.py | 2 |
3 files changed, 19 insertions, 11 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 8b10584e..58b8e5d0 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -246,7 +246,7 @@ class CascadeAdmin(skytools.AdminScript): combined_set = None - provider_db = self.get_database('provider_db', connstr = provider_loc) + provider_db = self.get_database('provider_db', connstr = provider_loc, profile = 'remote') q = "select node_type, node_name from pgq_node.get_node_info(%s)" res = self.exec_query(provider_db, q, [self.queue_name]) row = res[0] @@ -297,7 +297,7 @@ class CascadeAdmin(skytools.AdminScript): def check_public_connstr(self, db, pub_connstr): """Look if public and local connect strings point to same db's. """ - pub_db = self.get_database("pub_db", connstr = pub_connstr) + pub_db = self.get_database("pub_db", connstr = pub_connstr, profile = 'remote') curs1 = db.cursor() curs2 = pub_db.cursor() q = "select oid, datname, txid_current() as txid, txid_current_snapshot() as snap"\ @@ -339,12 +339,12 @@ class CascadeAdmin(skytools.AdminScript): """Find root node, having start point.""" if initial_loc: loc = initial_loc + db = self.get_database('root_db', connstr = loc, profile = 'remote') else: loc = self.cf.get(self.initial_db_name) - - while 1: db = self.get_database('root_db', connstr = loc) + while 1: # query current status res = self.exec_query(db, "select * from pgq_node.get_node_info(%s)", [self.queue_name]) info = res[0] @@ -367,6 +367,9 @@ class CascadeAdmin(skytools.AdminScript): if loc is None: self.log.error("Sub node provider not initialized?") sys.exit(1) + + db = self.get_database('root_db', connstr = loc, profile = 'remote') + raise Exception('process canceled') def find_root_node(self): @@ -430,6 +433,8 @@ class CascadeAdmin(skytools.AdminScript): """Show set status.""" self.load_local_info() + cstr_extra = self.cf.get('remote_extra_connstr', '') + # prepare structs for workers members = Queue.Queue() for m in self.queue_info.member_map.itervalues(): @@ -441,7 +446,7 @@ class CascadeAdmin(skytools.AdminScript): num_threads = max (min (num_nodes / 4, 100), 1) tlist = [] for i in range(num_threads): - t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes)) + t = threading.Thread (target = self._cmd_status_worker, args = (members, nodes, cstr_extra)) t.daemon = True t.start() tlist.append(t) @@ -458,14 +463,17 @@ class CascadeAdmin(skytools.AdminScript): self.queue_info.print_tree() - def _cmd_status_worker (self, members, nodes): + def _cmd_status_worker (self, members, nodes, cstr_extra): # members in, nodes out, both thread-safe while True: try: m = members.get_nowait() except Queue.Empty: break - node = self.load_node_status (m.name, m.location) + loc = m.location + if cstr_extra: + loc = loc + ' ' + cstr_extra + node = self.load_node_status (m.name, loc) nodes.put(node) members.task_done() @@ -1362,7 +1370,7 @@ class CascadeAdmin(skytools.AdminScript): if m.dead: return None loc = m.location - db = self.get_database('node.' + node_name, connstr = loc) + db = self.get_database('node.' + node_name, connstr = loc, profile = 'remote') return db def node_alive(self, node_name): diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index cc1bd7a9..68e949ef 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -64,7 +64,7 @@ class CascadedConsumer(BaseConsumer): dst_db = self.get_database(self.target_db) dst_curs = dst_db.cursor() - src_db = self.get_database(PDB, connstr = provider_loc) + src_db = self.get_database(PDB, connstr = provider_loc, profile = 'remote') src_curs = src_db.cursor() # check target info @@ -99,7 +99,7 @@ class CascadedConsumer(BaseConsumer): def get_provider_db(self, state): provider_loc = state['provider_location'] - return self.get_database(PDB, connstr = provider_loc) + return self.get_database(PDB, connstr = provider_loc, profile = 'remote') def unregister_consumer(self): dst_db = self.get_database(self.target_db) diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index dbe6ba3c..d6b1fa96 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -263,7 +263,7 @@ class CascadedWorker(CascadedConsumer): if n['dead']: # ignore dead nodes continue - wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1) + wmdb = self.get_database('wmdb', connstr = n['node_location'], autocommit = 1, profile = 'remote') wmcurs = wmdb.cursor() q = 'select local_watermark from pgq_node.get_node_info(%s)' wmcurs.execute(q, [self.queue_name]) |
