diff options
| author | Marko Kreen | 2009-06-01 13:32:08 +0000 |
|---|---|---|
| committer | Marko Kreen | 2009-06-01 13:39:04 +0000 |
| commit | be82460b0778e282336e0a73eec5b069cd59bb53 (patch) | |
| tree | 59f36e8063218cebff6be4be8ab00b805d1e2c74 /python/pgq | |
| parent | 52fa34d45d19fe4843b77e7ff21a4f9e93832800 (diff) | |
python/pgq: relaxed event handling
.tag_done() call is no more required. Events are by default in
'done' state.
In 2.x events were in 'retry' state by default, which was very bad
idea in retrospect. Changing them to 'untagged' and still requiring
tag_done() does not seem too good either. Original reasoning was to
detect and survive errors in scripts, but the result was only
confusion to everybody.
So instead of assuming that script may be buggy, now we assume
that script knows what it does. And only by explicit action
can they be tagged as retry.
Diffstat (limited to 'python/pgq')
| -rw-r--r-- | python/pgq/cascade/consumer.py | 4 | ||||
| -rw-r--r-- | python/pgq/cascade/worker.py | 8 | ||||
| -rw-r--r-- | python/pgq/consumer.py | 9 | ||||
| -rw-r--r-- | python/pgq/event.py | 4 | ||||
| -rw-r--r-- | python/pgq/remoteconsumer.py | 4 |
5 files changed, 13 insertions, 16 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index a3b0fc7c..8676e5bd 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -102,8 +102,6 @@ class CascadedConsumer(Consumer): state = self._consumer_state if self.is_batch_done(state, self._batch_info): - for ev in event_list: - ev.tag_done() return dst_db = self.get_database(self.target_db) @@ -201,7 +199,7 @@ class CascadedConsumer(Consumer): """ if ev.ev_type[:4] == "pgq.": # ignore cascading events - ev.tag_done() + pass else: raise Exception('Unhandled event type in queue: %s' % ev.ev_type) diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py index 5e17e358..c697156a 100644 --- a/python/pgq/cascade/worker.py +++ b/python/pgq/cascade/worker.py @@ -124,8 +124,6 @@ class CascadedWorker(CascadedConsumer): else: if st.process_events: self.process_remote_event(src_curs, dst_curs, ev) - else: - ev.tag_done() if ev.ev_id > max_id: max_id = ev.ev_id if st.local_wm_publish: @@ -163,6 +161,10 @@ class CascadedWorker(CascadedConsumer): def process_remote_event(self, src_curs, dst_curs, ev): """Handle cascading events. """ + + if ev.retry: + raise Exception('CascadedWorker must not get retry events') + # non cascade events send to CascadedConsumer to error out if ev.ev_type[:4] != 'pgq.': CascadedConsumer.process_remote_event(self, src_curs, dst_curs, ev) @@ -170,7 +172,6 @@ class CascadedWorker(CascadedConsumer): # ignore cascade events if not main worker if not self.main_worker: - ev.tag_done() return # check if for right queue @@ -199,7 +200,6 @@ class CascadedWorker(CascadedConsumer): dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id]) else: raise Exception("unknown cascade event: %s" % t) - ev.tag_done() def finish_remote_batch(self, src_db, dst_db, tick_id): """Worker-specific cleanup on target node. diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 01f16abd..a23b882e 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -68,7 +68,6 @@ class _BatchWalker(object): self.length += len(rows) for row in rows: ev = _WalkerEvent(self, self.queue_name, row) - self.status_map[ev.id] = (EV_UNTAGGED, None) yield ev self.curs.execute("close %s" % self.sql_cursor) @@ -77,17 +76,19 @@ class _BatchWalker(object): def __len__(self): if self.fetch_status != 2: - raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status) + return -1 + #raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status) return self.length def tag_event_done(self, event): - del self.status_map[event.id] + if event.id in self.status_map: + del self.status_map[event.id] def tag_event_retry(self, event, retry_time): self.status_map[event.id] = (EV_RETRY, retry_time) def get_status(self, event): - return self.status_map[event.id][0] + return self.status_map.get(event.id, (EV_DONE, 0))[0] def iter_status(self): for res in self.status_map.iteritems(): diff --git a/python/pgq/event.py b/python/pgq/event.py index 93745035..39735507 100644 --- a/python/pgq/event.py +++ b/python/pgq/event.py @@ -19,6 +19,7 @@ _fldmap = { 'ev_extra2': 'ev_extra2', 'ev_extra3': 'ev_extra3', 'ev_extra4': 'ev_extra4', + 'ev_retry': 'ev_retry', 'id': 'ev_id', 'txid': 'ev_txid', @@ -29,6 +30,7 @@ _fldmap = { 'extra2': 'ev_extra2', 'extra3': 'ev_extra3', 'extra4': 'ev_extra4', + 'retry': 'ev_retry', } class Event(object): @@ -42,7 +44,7 @@ class Event(object): def __init__(self, queue_name, row): self._event_row = row - self._status = EV_UNTAGGED + self._status = EV_DONE self.retry_time = 60 self.queue_name = queue_name diff --git a/python/pgq/remoteconsumer.py b/python/pgq/remoteconsumer.py index f5c2ced5..cc8b73d9 100644 --- a/python/pgq/remoteconsumer.py +++ b/python/pgq/remoteconsumer.py @@ -29,8 +29,6 @@ class RemoteConsumer(Consumer): curs = dst_db.cursor() if self.is_last_batch(curs, batch_id): - for ev in event_list: - ev.tag_done() return self.process_remote_batch(db, batch_id, event_list, dst_db) @@ -103,8 +101,6 @@ class SerialConsumer(Consumer): # check if done if self.is_batch_done(curs): - for ev in event_list: - ev.tag_done() return # actual work |
