diff options
author | Marko Kreen | 2011-10-27 20:39:15 +0000 |
---|---|---|
committer | Marko Kreen | 2011-10-27 20:39:15 +0000 |
commit | d64fffbb3a0acc314ff208bbc75ea17ba7f7a2f2 (patch) | |
tree | 0c3e32c6741f3a0469da1ccae0e2490bcd4db379 /python/pgq/consumer.py | |
parent | 63919e092fad60ad776918388ba42794c339d268 (diff) |
pgq.Consumer: support .consumer_filter with lazy_fetch
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index a21f14d1..76470b72 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -39,7 +39,7 @@ class _BatchWalker(object): - one for loop over events - len() after that """ - def __init__(self, curs, batch_id, queue_name, fetch_size = 300): + def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): self.queue_name = queue_name self.fetch_size = fetch_size self.sql_cursor = "batch_walker" @@ -48,14 +48,15 @@ class _BatchWalker(object): self.status_map = {} self.batch_id = batch_id self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done + self.consumer_filter = consumer_filter def __iter__(self): if self.fetch_status: raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) self.fetch_status = 1 - q = "select * from pgq.get_batch_cursor(%s, %s, %s)" - self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size]) + q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)" + self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter]) # this will return first batch of rows q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor) @@ -306,7 +307,7 @@ class Consumer(skytools.DBScript): """Fetch all events for this batch.""" if self.pgq_lazy_fetch: - return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch) + return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter) else: return self._load_batch_events_old(curs, batch_id) |