diff options
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 294519d8..d109f749 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -4,17 +4,47 @@ """ from pgq.baseconsumer import BaseConsumer, BaseBatchWalker -from pgq.event import * +from pgq.event import Event __all__ = ['Consumer'] +# Event status codes +EV_UNTAGGED = -1 +EV_RETRY = 0 +EV_DONE = 1 + + +class RetriableEvent(Event): + """Event which can be retryed + + Consumer is supposed to tag them after processing. + """ + + __slots__ = ('_status', ) + + def __init__(self, queue_name, row): + super(RetriableEvent, self).__init__(self, queue_name, row) + self._status = EV_DONE + + def tag_done(self): + self._status = EV_DONE + + def get_status(self): + return self._status + + def tag_retry(self, retry_time = 60): + self._status = EV_RETRY + self.retry_time = retry_time + + class RetriableWalkerEvent(RetriableEvent): """Redirects status flags to RetriableBatchWalker. That way event data can be gc'd immediately and tag_done() events don't need to be remembered. """ + __slots__ = ('_walker', ) def __init__(self, walker, queue, row): Event.__init__(self, queue, row) self._walker = walker @@ -60,6 +90,9 @@ class Consumer(BaseConsumer): _batch_walker_class = RetriableBatchWalker + def _make_event(self, queue_name, row): + return RetriableEvent(queue_name, row) + def _flush_retry(self, curs, batch_id, list): """Tag retry events.""" |