summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
authorMarko Kreen2007-09-17 09:02:51 +0000
committerMarko Kreen2007-09-17 09:02:51 +0000
commit2ae52135e25bac13c56ac77914ce254c59a30db6 (patch)
tree4987f329b253ad4a36ce147520406e8f9236d1e7 /python/pgq/consumer.py
parentf008fe1ca8293fb36a3f620fce7c46a43ae79422 (diff)
experimental lazy event fetching
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py127
1 files changed, 109 insertions, 18 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index e661dac4..c44ba236 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -13,6 +13,86 @@ from pgq.event import *
__all__ = ['Consumer', 'RemoteConsumer', 'SerialConsumer']
+class _WalkerEvent(Event):
+ """Redirects status flags to BatchWalker.
+
+ That way event data can gc-d immidiately and
+ tag_done() events dont need to be remembered.
+ """
+ def __init__(self, walker, queue, row):
+ Event.__init__(self, queue, row)
+ self._walker = walker
+
+ def tag_done(self):
+ self._walker.tag_event_done(self)
+ def tag_retry(self, retry_time = 60):
+ self._walker.tag_event_retry(self, retry_time)
+ def tag_failed(self, reason):
+ self._walker.tag_failed(self, reason)
+
+class _BatchWalker(object):
+ """Lazy iterator over batch events.
+
+ Events are loaded using cursor. It will be given
+ as ev_list to process_batch(). It allows:
+
+ - one for loop over events
+ - len() after that
+ """
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300):
+ self.queue_name = queue_name
+ self.fetch_size = fetch_size
+ self.sql_cursor = "batch_walker"
+ self.curs = curs
+ self.length = 0
+ self.status_map = {}
+ curs.execute("select pgq.batch_event_sql(%s)", [batch_id])
+ self.batch_sql = curs.fetchone()[0]
+ self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
+
+ def __iter__(self):
+ if self.fetch_status:
+ raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
+ self.fetch_status = 1
+
+ q = "declare %s no scroll cursor for %s" % (self.sql_cursor, self.batch_sql)
+ self.curs.execute(q)
+
+ q = "fetch %d from batch_walker" % self.fetch_size
+ while 1:
+ self.curs.execute(q)
+ rows = self.curs.dictfetchall()
+ if not len(rows):
+ break
+
+ self.length += len(rows)
+ for row in rows:
+ ev = _WalkerEvent(self, self.queue_name, row)
+ ev.tag_retry()
+ yield ev
+
+ self.curs.execute("close %s" % self.sql_cursor)
+
+ self.fetch_status = 2
+
+ def __len__(self):
+ if self.fetch_status != 2:
+ raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status)
+ return self.length
+
+ def tag_event_done(self, event):
+ del self.status_map[event.id]
+
+ def tag_event_retry(self, event, retry_time):
+ self.status_map[event.id] = (EV_RETRY, retry_time)
+
+ def tag_event_failed(self, event, reason):
+ self.status_map[event.id] = (EV_FAILED, reason)
+
+ def iter_status(self):
+ for res in self.status_map.iteritems():
+ yield res
+
class Consumer(skytools.DBScript):
"""Consumer base class.
"""
@@ -31,6 +111,7 @@ class Consumer(skytools.DBScript):
self.reg_list = []
self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name)
self.pgq_queue_name = self.cf.get("pgq_queue_name")
+ self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", 0)
def attach(self):
"""Attach consumer to interesting queues."""
@@ -125,7 +206,7 @@ class Consumer(skytools.DBScript):
def _launch_process_batch(self, db, batch_id, list):
self.process_batch(db, batch_id, list)
- def _load_batch_events(self, curs, batch_id, queue_name):
+ def _load_batch_events_old(self, curs, batch_id, queue_name):
"""Fetch all events for this batch."""
# load events
@@ -141,6 +222,14 @@ class Consumer(skytools.DBScript):
return list
+ def _load_batch_events(self, curs, batch_id, queue_name):
+ """Fetch all events for this batch."""
+
+ if self.pgq_lazy_fetch:
+ return _BatchWalker(curs, batch_id, queue_name, self.pgq_lazy_fetch)
+ else:
+ return self._load_batch_events_old(curs, batch_id, queue_name)
+
def _load_next_batch(self, curs, queue_name):
"""Allocate next batch. (internal)"""
@@ -151,25 +240,32 @@ class Consumer(skytools.DBScript):
def _finish_batch(self, curs, batch_id, list):
"""Tag events and notify that the batch is done."""
- retry = failed = 0
- for ev in list:
- if ev.status == EV_FAILED:
- self._tag_failed(curs, batch_id, ev)
- failed += 1
- elif ev.status == EV_RETRY:
- self._tag_retry(curs, batch_id, ev)
- retry += 1
+ if self.pgq_lazy_fetch:
+ for ev_id, stat in list.iter_status():
+ if stat[0] == EV_RETRY:
+ self._tag_retry(curs, batch_id, ev_id, stat[1])
+ elif stat[0] == EV_FAILED:
+ self._tag_failed(curs, batch_id, ev_id, stat[1])
+ else:
+ for ev in list:
+ if ev.status == EV_FAILED:
+ self._tag_failed(curs, batch_id, ev.id, ev.fail_reason)
+ failed += 1
+ elif ev.status == EV_RETRY:
+ self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
+ retry += 1
+
curs.execute("select pgq.finish_batch(%s)", [batch_id])
- def _tag_failed(self, curs, batch_id, ev):
+ def _tag_failed(self, curs, batch_id, ev_id, fail_reason):
"""Tag event as failed. (internal)"""
curs.execute("select pgq.event_failed(%s, %s, %s)",
- [batch_id, ev.id, ev.fail_reason])
+ [batch_id, ev_id, fail_reason])
- def _tag_retry(self, cx, batch_id, ev):
+ def _tag_retry(self, cx, batch_id, ev_id, retry_time):
"""Tag event for retry. (internal)"""
cx.execute("select pgq.event_retry(%s, %s, %s)",
- [batch_id, ev.id, ev.retry_time])
+ [batch_id, ev_id, retry_time])
def get_batch_info(self, batch_id):
"""Get info about batch.
@@ -303,11 +399,6 @@ class SerialConsumer(Consumer):
# actual work
self.process_remote_batch(db, batch_id, event_list, dst_db)
- # make sure no retry events
- for ev in event_list:
- if ev.status == EV_RETRY:
- raise Exception("SerialConsumer must not use retry queue")
-
# finish work
self.set_batch_done(curs)
dst_db.commit()