diff options
Diffstat (limited to 'python/pgq/event.py')
-rw-r--r-- | python/pgq/event.py | 42 |
1 files changed, 29 insertions, 13 deletions
diff --git a/python/pgq/event.py b/python/pgq/event.py index 39735507..ddd550e4 100644 --- a/python/pgq/event.py +++ b/python/pgq/event.py @@ -2,7 +2,7 @@ """PgQ event container. """ -__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event'] +__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent'] # Event status codes EV_UNTAGGED = -1 @@ -35,32 +35,26 @@ _fldmap = { class Event(object): """Event data for consumers. - - Consumer is supposed to tag them after processing. - If not, events will stay in retry queue. + + Will be removed from the queue by default. """ - __slots__ = ('_event_row', '_status', 'retry_time', + __slots__ = ('_event_row', 'retry_time', 'queue_name') def __init__(self, queue_name, row): self._event_row = row - self._status = EV_DONE self.retry_time = 60 self.queue_name = queue_name def __getattr__(self, key): return self._event_row[_fldmap[key]] + # would be better in RetriableEvent only since we don't care but + # unfortunatelly it needs to be defined here due to compatibility concerns def tag_done(self): + pass self._status = EV_DONE - def tag_retry(self, retry_time = 60): - self._status = EV_RETRY - self.retry_time = retry_time - - def get_status(self): - return self._status - # be also dict-like def __getitem__(self, k): return self._event_row.__getitem__(k) def __contains__(self, k): return self._event_row.__contains__(k) @@ -74,3 +68,25 @@ class Event(object): def __str__(self): return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % ( self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4) + +class RetriableEvent(Event): + """Event which can be retryed + + Consumer is supposed to tag them after processing. + """ + + __slots__ = Event.__slots__ + ('_status', ) + + def __init__(self, queue_name, row): + super(RetriableEvent, self).__init__(self, queue_name, row) + self._status = EV_DONE + + def tag_done(self): + self._status = EV_DONE + + def get_status(self): + return self._status + + def tag_retry(self, retry_time = 60): + self._status = EV_RETRY + self.retry_time = retry_time |