diff options
author | Marko Kreen | 2009-06-01 13:32:08 +0000 |
---|---|---|
committer | Marko Kreen | 2009-06-01 13:39:04 +0000 |
commit | be82460b0778e282336e0a73eec5b069cd59bb53 (patch) | |
tree | 59f36e8063218cebff6be4be8ab00b805d1e2c74 /python/pgq/consumer.py | |
parent | 52fa34d45d19fe4843b77e7ff21a4f9e93832800 (diff) |
python/pgq: relaxed event handling
.tag_done() call is no more required. Events are by default in
'done' state.
In 2.x events were in 'retry' state by default, which was very bad
idea in retrospect. Changing them to 'untagged' and still requiring
tag_done() does not seem too good either. Original reasoning was to
detect and survive errors in scripts, but the result was only
confusion to everybody.
So instead of assuming that script may be buggy, now we assume
that script knows what it does. And only by explicit action
can they be tagged as retry.
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 01f16abd..a23b882e 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -68,7 +68,6 @@ class _BatchWalker(object): self.length += len(rows) for row in rows: ev = _WalkerEvent(self, self.queue_name, row) - self.status_map[ev.id] = (EV_UNTAGGED, None) yield ev self.curs.execute("close %s" % self.sql_cursor) @@ -77,17 +76,19 @@ class _BatchWalker(object): def __len__(self): if self.fetch_status != 2: - raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status) + return -1 + #raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status) return self.length def tag_event_done(self, event): - del self.status_map[event.id] + if event.id in self.status_map: + del self.status_map[event.id] def tag_event_retry(self, event, retry_time): self.status_map[event.id] = (EV_RETRY, retry_time) def get_status(self, event): - return self.status_map[event.id][0] + return self.status_map.get(event.id, (EV_DONE, 0))[0] def iter_status(self): for res in self.status_map.iteritems(): |