diff options
author | Marko Kreen | 2007-09-17 09:02:51 +0000 |
---|---|---|
committer | Marko Kreen | 2007-09-17 09:02:51 +0000 |
commit | 2ae52135e25bac13c56ac77914ce254c59a30db6 (patch) | |
tree | 4987f329b253ad4a36ce147520406e8f9236d1e7 /python/pgq/consumer.py | |
parent | f008fe1ca8293fb36a3f620fce7c46a43ae79422 (diff) |
experimental lazy event fetching
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 127 |
1 files changed, 109 insertions, 18 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index e661dac4..c44ba236 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -13,6 +13,86 @@ from pgq.event import * __all__ = ['Consumer', 'RemoteConsumer', 'SerialConsumer'] +class _WalkerEvent(Event): + """Redirects status flags to BatchWalker. + + That way event data can gc-d immidiately and + tag_done() events dont need to be remembered. + """ + def __init__(self, walker, queue, row): + Event.__init__(self, queue, row) + self._walker = walker + + def tag_done(self): + self._walker.tag_event_done(self) + def tag_retry(self, retry_time = 60): + self._walker.tag_event_retry(self, retry_time) + def tag_failed(self, reason): + self._walker.tag_failed(self, reason) + +class _BatchWalker(object): + """Lazy iterator over batch events. + + Events are loaded using cursor. It will be given + as ev_list to process_batch(). It allows: + + - one for loop over events + - len() after that + """ + def __init__(self, curs, batch_id, queue_name, fetch_size = 300): + self.queue_name = queue_name + self.fetch_size = fetch_size + self.sql_cursor = "batch_walker" + self.curs = curs + self.length = 0 + self.status_map = {} + curs.execute("select pgq.batch_event_sql(%s)", [batch_id]) + self.batch_sql = curs.fetchone()[0] + self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done + + def __iter__(self): + if self.fetch_status: + raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) + self.fetch_status = 1 + + q = "declare %s no scroll cursor for %s" % (self.sql_cursor, self.batch_sql) + self.curs.execute(q) + + q = "fetch %d from batch_walker" % self.fetch_size + while 1: + self.curs.execute(q) + rows = self.curs.dictfetchall() + if not len(rows): + break + + self.length += len(rows) + for row in rows: + ev = _WalkerEvent(self, self.queue_name, row) + ev.tag_retry() + yield ev + + self.curs.execute("close %s" % self.sql_cursor) + + self.fetch_status = 2 + + def __len__(self): + if self.fetch_status != 2: + raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status) + return self.length + + def tag_event_done(self, event): + del self.status_map[event.id] + + def tag_event_retry(self, event, retry_time): + self.status_map[event.id] = (EV_RETRY, retry_time) + + def tag_event_failed(self, event, reason): + self.status_map[event.id] = (EV_FAILED, reason) + + def iter_status(self): + for res in self.status_map.iteritems(): + yield res + class Consumer(skytools.DBScript): """Consumer base class. """ @@ -31,6 +111,7 @@ class Consumer(skytools.DBScript): self.reg_list = [] self.consumer_id = self.cf.get("pgq_consumer_id", self.job_name) self.pgq_queue_name = self.cf.get("pgq_queue_name") + self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", 0) def attach(self): """Attach consumer to interesting queues.""" @@ -125,7 +206,7 @@ class Consumer(skytools.DBScript): def _launch_process_batch(self, db, batch_id, list): self.process_batch(db, batch_id, list) - def _load_batch_events(self, curs, batch_id, queue_name): + def _load_batch_events_old(self, curs, batch_id, queue_name): """Fetch all events for this batch.""" # load events @@ -141,6 +222,14 @@ class Consumer(skytools.DBScript): return list + def _load_batch_events(self, curs, batch_id, queue_name): + """Fetch all events for this batch.""" + + if self.pgq_lazy_fetch: + return _BatchWalker(curs, batch_id, queue_name, self.pgq_lazy_fetch) + else: + return self._load_batch_events_old(curs, batch_id, queue_name) + def _load_next_batch(self, curs, queue_name): """Allocate next batch. (internal)""" @@ -151,25 +240,32 @@ class Consumer(skytools.DBScript): def _finish_batch(self, curs, batch_id, list): """Tag events and notify that the batch is done.""" - retry = failed = 0 - for ev in list: - if ev.status == EV_FAILED: - self._tag_failed(curs, batch_id, ev) - failed += 1 - elif ev.status == EV_RETRY: - self._tag_retry(curs, batch_id, ev) - retry += 1 + if self.pgq_lazy_fetch: + for ev_id, stat in list.iter_status(): + if stat[0] == EV_RETRY: + self._tag_retry(curs, batch_id, ev_id, stat[1]) + elif stat[0] == EV_FAILED: + self._tag_failed(curs, batch_id, ev_id, stat[1]) + else: + for ev in list: + if ev.status == EV_FAILED: + self._tag_failed(curs, batch_id, ev.id, ev.fail_reason) + failed += 1 + elif ev.status == EV_RETRY: + self._tag_retry(curs, batch_id, ev.id, ev.retry_time) + retry += 1 + curs.execute("select pgq.finish_batch(%s)", [batch_id]) - def _tag_failed(self, curs, batch_id, ev): + def _tag_failed(self, curs, batch_id, ev_id, fail_reason): """Tag event as failed. (internal)""" curs.execute("select pgq.event_failed(%s, %s, %s)", - [batch_id, ev.id, ev.fail_reason]) + [batch_id, ev_id, fail_reason]) - def _tag_retry(self, cx, batch_id, ev): + def _tag_retry(self, cx, batch_id, ev_id, retry_time): """Tag event for retry. (internal)""" cx.execute("select pgq.event_retry(%s, %s, %s)", - [batch_id, ev.id, ev.retry_time]) + [batch_id, ev_id, retry_time]) def get_batch_info(self, batch_id): """Get info about batch. @@ -303,11 +399,6 @@ class SerialConsumer(Consumer): # actual work self.process_remote_batch(db, batch_id, event_list, dst_db) - # make sure no retry events - for ev in event_list: - if ev.status == EV_RETRY: - raise Exception("SerialConsumer must not use retry queue") - # finish work self.set_batch_done(curs) dst_db.commit() |