summaryrefslogtreecommitdiff
path: root/python/pgq
diff options
context:
space:
mode:
Diffstat (limited to 'python/pgq')
-rw-r--r--python/pgq/consumer.py15
-rw-r--r--python/pgq/event.py14
2 files changed, 21 insertions, 8 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index cf47875c..3b823031 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -29,6 +29,8 @@ class _WalkerEvent(Event):
self._walker.tag_event_retry(self, retry_time)
def tag_failed(self, reason):
self._walker.tag_failed(self, reason)
+ def get_status(self):
+ self._walker.get_status(self)
class _BatchWalker(object):
"""Lazy iterator over batch events.
@@ -36,8 +38,8 @@ class _BatchWalker(object):
Events are loaded using cursor. It will be given
as ev_list to process_batch(). It allows:
- - one for loop over events
- - len() after that
+ - one for loop over events
+ - len() after that
"""
def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
self.queue_name = queue_name
@@ -68,7 +70,7 @@ class _BatchWalker(object):
self.length += len(rows)
for row in rows:
ev = _WalkerEvent(self, self.queue_name, row)
- ev.tag_retry()
+ self.status_map[ev.id] = (EV_UNTAGGED, None)
yield ev
self.curs.execute("close %s" % self.sql_cursor)
@@ -89,6 +91,9 @@ class _BatchWalker(object):
def tag_event_failed(self, event, reason):
self.status_map[event.id] = (EV_FAILED, reason)
+ def get_status(self, event):
+ return self.status_map[event.id][0]
+
def iter_status(self):
for res in self.status_map.iteritems():
yield res
@@ -249,6 +254,8 @@ class Consumer(skytools.DBScript):
elif stat[0] == EV_FAILED:
self._tag_failed(curs, batch_id, ev_id, stat[1])
failed += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
else:
for ev in list:
if ev.status == EV_FAILED:
@@ -257,6 +264,8 @@ class Consumer(skytools.DBScript):
elif ev.status == EV_RETRY:
self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
retry += 1
+ elif stat[0] != EV_DONE:
+ raise Exception("Untagged event: %d" % ev_id)
# report weird events
if retry:
diff --git a/python/pgq/event.py b/python/pgq/event.py
index d7b2d7ee..0f14298c 100644
--- a/python/pgq/event.py
+++ b/python/pgq/event.py
@@ -2,9 +2,10 @@
"""PgQ event container.
"""
-__all__ = ('EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event')
+__all__ = ['EV_RETRY', 'EV_DONE', 'EV_FAILED', 'Event']
# Event status codes
+EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1
EV_FAILED = 2
@@ -39,7 +40,7 @@ class Event(object):
"""
def __init__(self, queue_name, row):
self._event_row = row
- self.status = EV_RETRY
+ self._status = EV_UNTAGGED
self.retry_time = 60
self.fail_reason = "Buggy consumer"
self.queue_name = queue_name
@@ -48,13 +49,16 @@ class Event(object):
return self._event_row[_fldmap[key]]
def tag_done(self):
- self.status = EV_DONE
+ self._status = EV_DONE
def tag_retry(self, retry_time = 60):
- self.status = EV_RETRY
+ self._status = EV_RETRY
self.retry_time = retry_time
def tag_failed(self, reason):
- self.status = EV_FAILED
+ self._status = EV_FAILED
self.fail_reason = reason
+ def get_status(self):
+ return self._status
+