diff options
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index cf47875c..3b823031 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -29,6 +29,8 @@ class _WalkerEvent(Event): self._walker.tag_event_retry(self, retry_time) def tag_failed(self, reason): self._walker.tag_failed(self, reason) + def get_status(self): + self._walker.get_status(self) class _BatchWalker(object): """Lazy iterator over batch events. @@ -36,8 +38,8 @@ class _BatchWalker(object): Events are loaded using cursor. It will be given as ev_list to process_batch(). It allows: - - one for loop over events - - len() after that + - one for loop over events + - len() after that """ def __init__(self, curs, batch_id, queue_name, fetch_size = 300): self.queue_name = queue_name @@ -68,7 +70,7 @@ class _BatchWalker(object): self.length += len(rows) for row in rows: ev = _WalkerEvent(self, self.queue_name, row) - ev.tag_retry() + self.status_map[ev.id] = (EV_UNTAGGED, None) yield ev self.curs.execute("close %s" % self.sql_cursor) @@ -89,6 +91,9 @@ class _BatchWalker(object): def tag_event_failed(self, event, reason): self.status_map[event.id] = (EV_FAILED, reason) + def get_status(self, event): + return self.status_map[event.id][0] + def iter_status(self): for res in self.status_map.iteritems(): yield res @@ -249,6 +254,8 @@ class Consumer(skytools.DBScript): elif stat[0] == EV_FAILED: self._tag_failed(curs, batch_id, ev_id, stat[1]) failed += 1 + elif stat[0] != EV_DONE: + raise Exception("Untagged event: %d" % ev_id) else: for ev in list: if ev.status == EV_FAILED: @@ -257,6 +264,8 @@ class Consumer(skytools.DBScript): elif ev.status == EV_RETRY: self._tag_retry(curs, batch_id, ev.id, ev.retry_time) retry += 1 + elif stat[0] != EV_DONE: + raise Exception("Untagged event: %d" % ev_id) # report weird events if retry: |