summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMarko Kreen2009-11-03 12:26:19 +0000
committerMarko Kreen2009-11-03 12:43:44 +0000
commitc9de6870b604e6aa99c42ef1ba12bc4b06c64a31 (patch)
tree2172028d318fa73f5d1647365153bb661b225318 /python
parent5c4ab947babf423aa83150f7eaaf5aec53a11a0b (diff)
fix pgq brokenness related to recent commits
- get_batch_cursor() - loop_delay
Diffstat (limited to 'python')
-rw-r--r--python/pgq/cascade/admin.py9
-rw-r--r--python/pgq/consumer.py5
-rwxr-xr-xpython/pgqadm.py4
3 files changed, 12 insertions, 6 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index 48a582fe..5f42a324 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -237,7 +237,7 @@ class CascadeAdmin(skytools.AdminScript):
else:
loc = self.cf.get(self.initial_db_name)
- while self.looping:
+ while 1:
db = self.get_database('root_db', connstr = loc)
@@ -389,6 +389,7 @@ class CascadeAdmin(skytools.AdminScript):
node_db = self.get_node_database(node)
qinfo = self.load_queue_info(node_db)
ninfo = qinfo.local_node
+ node_location = qinfo.get_member(node).location
# reload consumer info
cmap = self.get_node_consumer_map(node)
@@ -398,8 +399,8 @@ class CascadeAdmin(skytools.AdminScript):
is_worker = (ninfo.worker_name == consumer)
# fixme: expect the node to be described already
- #q = "select * from pgq_node.add_member(%s, %s, %s, false)"
- #self.node_cmd(new_provider, q, [self.queue_name, node_name, node_location])
+ q = "select * from pgq_node.register_location(%s, %s, %s, false)"
+ self.node_cmd(new_provider, q, [self.queue_name, node, node_location])
# subscribe on new provider
if is_worker:
@@ -729,7 +730,7 @@ class CascadeAdmin(skytools.AdminScript):
self.node_cmd(node, q, [self.queue_name, consumer, pause_flag])
self.log.info('Waiting for worker to accept')
- while self.looping:
+ while 1:
q = "select * from pgq_node.get_consumer_state(%s, %s)"
stat = self.node_cmd(node, q, [self.queue_name, consumer], quiet = 1)[0]
if stat['paused'] != pause_flag:
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index ef6283d6..04915614 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -46,6 +46,7 @@ class _BatchWalker(object):
self.curs = curs
self.length = 0
self.status_map = {}
+ self.batch_id = batch_id
self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
def __iter__(self):
@@ -54,7 +55,7 @@ class _BatchWalker(object):
self.fetch_status = 1
q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
- self.curs.execute(q, [self.queue_name, self.sql_cursor, self.fetch_size])
+ self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size])
# this will return first batch of rows
q = "fetch %d from batch_walker" % self.fetch_size
@@ -167,7 +168,7 @@ class Consumer(skytools.DBScript):
self.consumer_id = self.consumer_name
def reload(self):
- DBScript.reload(self)
+ skytools.DBScript.reload(self)
self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)
# set following ones to None if not set
diff --git a/python/pgqadm.py b/python/pgqadm.py
index bd1ff8a6..7fdfdc3b 100755
--- a/python/pgqadm.py
+++ b/python/pgqadm.py
@@ -118,6 +118,10 @@ class PGQAdmin(skytools.DBScript):
else:
skytools.DBScript.start(self)
+ def reload(self):
+ skytools.DBScript.reload(self)
+ self.set_single_loop(1)
+
def init_optparse(self, parser=None):
p = skytools.DBScript.init_optparse(self, parser)
p.set_usage(command_usage.strip())