summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py15
1 files changed, 12 insertions, 3 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index cf47875c..3b823031 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -29,6 +29,8 @@ class _WalkerEvent(Event):
self._walker.tag_event_retry(self, retry_time)
def tag_failed(self, reason):
self._walker.tag_failed(self, reason)
+ def get_status(self):
+ self._walker.get_status(self)
class _BatchWalker(object):
"""Lazy iterator over batch events.
@@ -36,8 +38,8 @@ class _BatchWalker(object):
Events are loaded using cursor. It will be given
as ev_list to process_batch(). It allows:
- - one for loop over events
- - len() after that
+ - one for loop over events
+ - len() after that
"""
def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
self.queue_name = queue_name
@@ -68,7 +70,7 @@ class _BatchWalker(object):
self.length += len(rows)
for row in rows:
ev = _WalkerEvent(self, self.queue_name, row)
- ev.tag_retry()
+ self.status_map[ev.id] = (EV_UNTAGGED, None)
yield ev
self.curs.execute("close %s" % self.sql_cursor)
@@ -89,6 +91,9 @@ class _BatchWalker(object):
def tag_event_failed(self, event, reason):
self.status_map[event.id] = (EV_FAILED, reason)
+ def get_status(self, event):
+ return self.status_map[event.id][0]
+
def iter_status(self):
for res in self.status_map.iteritems():
yield res
@@ -249,6 +254,8 @@ class Consumer(skytools.DBScript):
elif stat[0] == EV_FAILED:
self._tag_failed(curs, batch_id, ev_id, stat[1])
failed += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
else:
for ev in list:
if ev.status == EV_FAILED:
@@ -257,6 +264,8 @@ class Consumer(skytools.DBScript):
elif ev.status == EV_RETRY:
self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
retry += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
# report weird events
if retry: