diff options
Diffstat (limited to 'python/pgq/baseconsumer.py')
| -rw-r--r-- | python/pgq/baseconsumer.py | 45 |
1 files changed, 9 insertions, 36 deletions
diff --git a/python/pgq/baseconsumer.py b/python/pgq/baseconsumer.py index 3ae0937d..7648a160 100644 --- a/python/pgq/baseconsumer.py +++ b/python/pgq/baseconsumer.py @@ -11,24 +11,10 @@ import sys, time, skytools from pgq.event import * -__all__ = ['BaseConsumer', 'WalkerEvent', 'BatchWalker'] +__all__ = ['BaseConsumer', 'BaseBatchWalker'] -class WalkerEvent(Event): - """Redirects status flags to BatchWalker. - That way event data can be gc'd immediately and - tag_done() events don't need to be remembered. - """ - def __init__(self, walker, queue, row): - Event.__init__(self, queue, row) - self._walker = walker - - def tag_done(self): - self._walker.tag_event_done(self) - def get_status(self): - self._walker.get_status(self) - -class BatchWalker(object): +class BaseBatchWalker(object): """Lazy iterator over batch events. Events are loaded using cursor. It will be given @@ -37,17 +23,18 @@ class BatchWalker(object): - one for loop over events - len() after that """ - def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None, event_class = WalkerEvent): + + _event_class = Event + + def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): self.queue_name = queue_name self.fetch_size = fetch_size self.sql_cursor = "batch_walker" self.curs = curs self.length = 0 - self.status_map = {} self.batch_id = batch_id self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done self.consumer_filter = consumer_filter - self.event_class = event_class def __iter__(self): if self.fetch_status: @@ -66,7 +53,7 @@ class BatchWalker(object): self.length += len(rows) for row in rows: - ev = self.event_class(self, self.queue_name, row) + ev = self._event_class(self, self.queue_name, row) yield ev # if less rows than requested, it was final block @@ -83,19 +70,6 @@ class BatchWalker(object): def __len__(self): return self.length - def tag_event_done(self, event): - if event.id in self.status_map: - del self.status_map[event.id] - - def tag_event_retry(self, event, retry_time): - self.status_map[event.id] = (EV_RETRY, retry_time) - - def get_status(self, event): - return self.status_map.get(event.id, (EV_DONE, 0))[0] - - def iter_status(self): - for res in self.status_map.iteritems(): - yield res class BaseConsumer(skytools.DBScript): """Consumer base class. @@ -165,8 +139,7 @@ class BaseConsumer(skytools.DBScript): # statistics: time spent waiting for events idle_start = None - # internal, event class for BatchWalker - _event_class = WalkerEvent + _batch_walker_class = BaseBatchWalker def __init__(self, service_name, db_name, args): """Initialize new consumer. @@ -333,7 +306,7 @@ class BaseConsumer(skytools.DBScript): """Fetch all events for this batch.""" if self.pgq_lazy_fetch: - return BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter, self._event_class) + return self._batch_walker_class(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter) else: return self._load_batch_events_old(curs, batch_id) |
