summaryrefslogtreecommitdiff
path: root/python/pgq/baseconsumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq/baseconsumer.py')
-rw-r--r--python/pgq/baseconsumer.py45
1 files changed, 9 insertions, 36 deletions
diff --git a/python/pgq/baseconsumer.py b/python/pgq/baseconsumer.py
index 3ae0937d..7648a160 100644
--- a/python/pgq/baseconsumer.py
+++ b/python/pgq/baseconsumer.py
@@ -11,24 +11,10 @@ import sys, time, skytools
from pgq.event import *
-__all__ = ['BaseConsumer', 'WalkerEvent', 'BatchWalker']
+__all__ = ['BaseConsumer', 'BaseBatchWalker']
-class WalkerEvent(Event):
- """Redirects status flags to BatchWalker.
- That way event data can be gc'd immediately and
- tag_done() events don't need to be remembered.
- """
- def __init__(self, walker, queue, row):
- Event.__init__(self, queue, row)
- self._walker = walker
-
- def tag_done(self):
- self._walker.tag_event_done(self)
- def get_status(self):
- self._walker.get_status(self)
-
-class BatchWalker(object):
+class BaseBatchWalker(object):
"""Lazy iterator over batch events.
Events are loaded using cursor. It will be given
@@ -37,17 +23,18 @@ class BatchWalker(object):
- one for loop over events
- len() after that
"""
- def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None, event_class = WalkerEvent):
+
+ _event_class = Event
+
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
self.queue_name = queue_name
self.fetch_size = fetch_size
self.sql_cursor = "batch_walker"
self.curs = curs
self.length = 0
- self.status_map = {}
self.batch_id = batch_id
self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
self.consumer_filter = consumer_filter
- self.event_class = event_class
def __iter__(self):
if self.fetch_status:
@@ -66,7 +53,7 @@ class BatchWalker(object):
self.length += len(rows)
for row in rows:
- ev = self.event_class(self, self.queue_name, row)
+ ev = self._event_class(self, self.queue_name, row)
yield ev
# if less rows than requested, it was final block
@@ -83,19 +70,6 @@ class BatchWalker(object):
def __len__(self):
return self.length
- def tag_event_done(self, event):
- if event.id in self.status_map:
- del self.status_map[event.id]
-
- def tag_event_retry(self, event, retry_time):
- self.status_map[event.id] = (EV_RETRY, retry_time)
-
- def get_status(self, event):
- return self.status_map.get(event.id, (EV_DONE, 0))[0]
-
- def iter_status(self):
- for res in self.status_map.iteritems():
- yield res
class BaseConsumer(skytools.DBScript):
"""Consumer base class.
@@ -165,8 +139,7 @@ class BaseConsumer(skytools.DBScript):
# statistics: time spent waiting for events
idle_start = None
- # internal, event class for BatchWalker
- _event_class = WalkerEvent
+ _batch_walker_class = BaseBatchWalker
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
@@ -333,7 +306,7 @@ class BaseConsumer(skytools.DBScript):
"""Fetch all events for this batch."""
if self.pgq_lazy_fetch:
- return BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter, self._event_class)
+ return self._batch_walker_class(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
else:
return self._load_batch_events_old(curs, batch_id)