summaryrefslogtreecommitdiff
path: root/python/pgq/event.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq/event.py')
-rw-r--r--python/pgq/event.py42
1 files changed, 29 insertions, 13 deletions
diff --git a/python/pgq/event.py b/python/pgq/event.py
index 39735507..ddd550e4 100644
--- a/python/pgq/event.py
+++ b/python/pgq/event.py
@@ -2,7 +2,7 @@
"""PgQ event container.
"""
-__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event']
+__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent']
# Event status codes
EV_UNTAGGED = -1
@@ -35,32 +35,26 @@ _fldmap = {
class Event(object):
"""Event data for consumers.
-
- Consumer is supposed to tag them after processing.
- If not, events will stay in retry queue.
+
+ Will be removed from the queue by default.
"""
- __slots__ = ('_event_row', '_status', 'retry_time',
+ __slots__ = ('_event_row', 'retry_time',
'queue_name')
def __init__(self, queue_name, row):
self._event_row = row
- self._status = EV_DONE
self.retry_time = 60
self.queue_name = queue_name
def __getattr__(self, key):
return self._event_row[_fldmap[key]]
+ # would be better in RetriableEvent only since we don't care but
+ # unfortunatelly it needs to be defined here due to compatibility concerns
def tag_done(self):
+ pass
self._status = EV_DONE
- def tag_retry(self, retry_time = 60):
- self._status = EV_RETRY
- self.retry_time = retry_time
-
- def get_status(self):
- return self._status
-
# be also dict-like
def __getitem__(self, k): return self._event_row.__getitem__(k)
def __contains__(self, k): return self._event_row.__contains__(k)
@@ -74,3 +68,25 @@ class Event(object):
def __str__(self):
return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % (
self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4)
+
+class RetriableEvent(Event):
+ """Event which can be retryed
+
+ Consumer is supposed to tag them after processing.
+ """
+
+ __slots__ = Event.__slots__ + ('_status', )
+
+ def __init__(self, queue_name, row):
+ super(RetriableEvent, self).__init__(self, queue_name, row)
+ self._status = EV_DONE
+
+ def tag_done(self):
+ self._status = EV_DONE
+
+ def get_status(self):
+ return self._status
+
+ def tag_retry(self, retry_time = 60):
+ self._status = EV_RETRY
+ self.retry_time = retry_time