summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
authormartinko2013-08-09 14:03:34 +0000
committermartinko2013-08-09 14:03:34 +0000
commit49ac8090682a5d76b577eeade3e29c9e4d83650c (patch)
tree0bd7141f24ccaa9cdaf2120006263bfe47d4f36a /python/pgq/consumer.py
parentaae5c5b1dde3e456189cdff0fa1cd7b5cb369aa8 (diff)
parent344d063d4e61bdd382d9e1977964fa1fe6363991 (diff)
Merge branch 'release/skytools_3_1_5'skytools_3_1_5
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py35
1 files changed, 34 insertions, 1 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 294519d8..d109f749 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -4,17 +4,47 @@
"""
from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
-from pgq.event import *
+from pgq.event import Event
__all__ = ['Consumer']
+# Event status codes
+EV_UNTAGGED = -1
+EV_RETRY = 0
+EV_DONE = 1
+
+
+class RetriableEvent(Event):
+ """Event which can be retryed
+
+ Consumer is supposed to tag them after processing.
+ """
+
+ __slots__ = ('_status', )
+
+ def __init__(self, queue_name, row):
+ super(RetriableEvent, self).__init__(self, queue_name, row)
+ self._status = EV_DONE
+
+ def tag_done(self):
+ self._status = EV_DONE
+
+ def get_status(self):
+ return self._status
+
+ def tag_retry(self, retry_time = 60):
+ self._status = EV_RETRY
+ self.retry_time = retry_time
+
+
class RetriableWalkerEvent(RetriableEvent):
"""Redirects status flags to RetriableBatchWalker.
That way event data can be gc'd immediately and
tag_done() events don't need to be remembered.
"""
+ __slots__ = ('_walker', )
def __init__(self, walker, queue, row):
Event.__init__(self, queue, row)
self._walker = walker
@@ -60,6 +90,9 @@ class Consumer(BaseConsumer):
_batch_walker_class = RetriableBatchWalker
+ def _make_event(self, queue_name, row):
+ return RetriableEvent(queue_name, row)
+
def _flush_retry(self, curs, batch_id, list):
"""Tag retry events."""