summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
authorMarko Kreen2011-10-27 20:39:15 +0000
committerMarko Kreen2011-10-27 20:39:15 +0000
commitd64fffbb3a0acc314ff208bbc75ea17ba7f7a2f2 (patch)
tree0c3e32c6741f3a0469da1ccae0e2490bcd4db379 /python/pgq/consumer.py
parent63919e092fad60ad776918388ba42794c339d268 (diff)
pgq.Consumer: support .consumer_filter with lazy_fetch
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py9
1 files changed, 5 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index a21f14d1..76470b72 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -39,7 +39,7 @@ class _BatchWalker(object):
- one for loop over events
- len() after that
"""
- def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
self.queue_name = queue_name
self.fetch_size = fetch_size
self.sql_cursor = "batch_walker"
@@ -48,14 +48,15 @@ class _BatchWalker(object):
self.status_map = {}
self.batch_id = batch_id
self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
+ self.consumer_filter = consumer_filter
def __iter__(self):
if self.fetch_status:
raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
self.fetch_status = 1
- q = "select * from pgq.get_batch_cursor(%s, %s, %s)"
- self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size])
+ q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"
+ self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])
# this will return first batch of rows
q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)
@@ -306,7 +307,7 @@ class Consumer(skytools.DBScript):
"""Fetch all events for this batch."""
if self.pgq_lazy_fetch:
- return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch)
+ return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
else:
return self._load_batch_events_old(curs, batch_id)