diff options
author | martinko | 2013-08-09 14:03:34 +0000 |
---|---|---|
committer | martinko | 2013-08-09 14:03:34 +0000 |
commit | 49ac8090682a5d76b577eeade3e29c9e4d83650c (patch) | |
tree | 0bd7141f24ccaa9cdaf2120006263bfe47d4f36a /python/pgq/consumer.py | |
parent | aae5c5b1dde3e456189cdff0fa1cd7b5cb369aa8 (diff) | |
parent | 344d063d4e61bdd382d9e1977964fa1fe6363991 (diff) |
Merge branch 'release/skytools_3_1_5'skytools_3_1_5
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 294519d8..d109f749 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -4,17 +4,47 @@ """ from pgq.baseconsumer import BaseConsumer, BaseBatchWalker -from pgq.event import * +from pgq.event import Event __all__ = ['Consumer'] +# Event status codes +EV_UNTAGGED = -1 +EV_RETRY = 0 +EV_DONE = 1 + + +class RetriableEvent(Event): + """Event which can be retryed + + Consumer is supposed to tag them after processing. + """ + + __slots__ = ('_status', ) + + def __init__(self, queue_name, row): + super(RetriableEvent, self).__init__(self, queue_name, row) + self._status = EV_DONE + + def tag_done(self): + self._status = EV_DONE + + def get_status(self): + return self._status + + def tag_retry(self, retry_time = 60): + self._status = EV_RETRY + self.retry_time = retry_time + + class RetriableWalkerEvent(RetriableEvent): """Redirects status flags to RetriableBatchWalker. That way event data can be gc'd immediately and tag_done() events don't need to be remembered. """ + __slots__ = ('_walker', ) def __init__(self, walker, queue, row): Event.__init__(self, queue, row) self._walker = walker @@ -60,6 +90,9 @@ class Consumer(BaseConsumer): _batch_walker_class = RetriableBatchWalker + def _make_event(self, queue_name, row): + return RetriableEvent(queue_name, row) + def _flush_retry(self, curs, batch_id, list): """Tag retry events.""" |