summaryrefslogtreecommitdiff
path: root/python/pgq
diff options
context:
space:
mode:
authorMarko Kreen2009-06-01 13:32:08 +0000
committerMarko Kreen2009-06-01 13:39:04 +0000
commitbe82460b0778e282336e0a73eec5b069cd59bb53 (patch)
tree59f36e8063218cebff6be4be8ab00b805d1e2c74 /python/pgq
parent52fa34d45d19fe4843b77e7ff21a4f9e93832800 (diff)
python/pgq: relaxed event handling
.tag_done() call is no more required. Events are by default in 'done' state. In 2.x events were in 'retry' state by default, which was very bad idea in retrospect. Changing them to 'untagged' and still requiring tag_done() does not seem too good either. Original reasoning was to detect and survive errors in scripts, but the result was only confusion to everybody. So instead of assuming that script may be buggy, now we assume that script knows what it does. And only by explicit action can they be tagged as retry.
Diffstat (limited to 'python/pgq')
-rw-r--r--python/pgq/cascade/consumer.py4
-rw-r--r--python/pgq/cascade/worker.py8
-rw-r--r--python/pgq/consumer.py9
-rw-r--r--python/pgq/event.py4
-rw-r--r--python/pgq/remoteconsumer.py4
5 files changed, 13 insertions, 16 deletions
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index a3b0fc7c..8676e5bd 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -102,8 +102,6 @@ class CascadedConsumer(Consumer):
state = self._consumer_state
if self.is_batch_done(state, self._batch_info):
- for ev in event_list:
- ev.tag_done()
return
dst_db = self.get_database(self.target_db)
@@ -201,7 +199,7 @@ class CascadedConsumer(Consumer):
"""
if ev.ev_type[:4] == "pgq.":
# ignore cascading events
- ev.tag_done()
+ pass
else:
raise Exception('Unhandled event type in queue: %s' % ev.ev_type)
diff --git a/python/pgq/cascade/worker.py b/python/pgq/cascade/worker.py
index 5e17e358..c697156a 100644
--- a/python/pgq/cascade/worker.py
+++ b/python/pgq/cascade/worker.py
@@ -124,8 +124,6 @@ class CascadedWorker(CascadedConsumer):
else:
if st.process_events:
self.process_remote_event(src_curs, dst_curs, ev)
- else:
- ev.tag_done()
if ev.ev_id > max_id:
max_id = ev.ev_id
if st.local_wm_publish:
@@ -163,6 +161,10 @@ class CascadedWorker(CascadedConsumer):
def process_remote_event(self, src_curs, dst_curs, ev):
"""Handle cascading events.
"""
+
+ if ev.retry:
+ raise Exception('CascadedWorker must not get retry events')
+
# non cascade events send to CascadedConsumer to error out
if ev.ev_type[:4] != 'pgq.':
CascadedConsumer.process_remote_event(self, src_curs, dst_curs, ev)
@@ -170,7 +172,6 @@ class CascadedWorker(CascadedConsumer):
# ignore cascade events if not main worker
if not self.main_worker:
- ev.tag_done()
return
# check if for right queue
@@ -199,7 +200,6 @@ class CascadedWorker(CascadedConsumer):
dst_curs.execute(q, [self.pgq_queue_name, ev.ev_extra1, tick_id])
else:
raise Exception("unknown cascade event: %s" % t)
- ev.tag_done()
def finish_remote_batch(self, src_db, dst_db, tick_id):
"""Worker-specific cleanup on target node.
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 01f16abd..a23b882e 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -68,7 +68,6 @@ class _BatchWalker(object):
self.length += len(rows)
for row in rows:
ev = _WalkerEvent(self, self.queue_name, row)
- self.status_map[ev.id] = (EV_UNTAGGED, None)
yield ev
self.curs.execute("close %s" % self.sql_cursor)
@@ -77,17 +76,19 @@ class _BatchWalker(object):
def __len__(self):
if self.fetch_status != 2:
- raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status)
+ return -1
+ #raise Exception("BatchWalker: len() for incomplete result. (%d)" % self.fetch_status)
return self.length
def tag_event_done(self, event):
- del self.status_map[event.id]
+ if event.id in self.status_map:
+ del self.status_map[event.id]
def tag_event_retry(self, event, retry_time):
self.status_map[event.id] = (EV_RETRY, retry_time)
def get_status(self, event):
- return self.status_map[event.id][0]
+ return self.status_map.get(event.id, (EV_DONE, 0))[0]
def iter_status(self):
for res in self.status_map.iteritems():
diff --git a/python/pgq/event.py b/python/pgq/event.py
index 93745035..39735507 100644
--- a/python/pgq/event.py
+++ b/python/pgq/event.py
@@ -19,6 +19,7 @@ _fldmap = {
'ev_extra2': 'ev_extra2',
'ev_extra3': 'ev_extra3',
'ev_extra4': 'ev_extra4',
+ 'ev_retry': 'ev_retry',
'id': 'ev_id',
'txid': 'ev_txid',
@@ -29,6 +30,7 @@ _fldmap = {
'extra2': 'ev_extra2',
'extra3': 'ev_extra3',
'extra4': 'ev_extra4',
+ 'retry': 'ev_retry',
}
class Event(object):
@@ -42,7 +44,7 @@ class Event(object):
def __init__(self, queue_name, row):
self._event_row = row
- self._status = EV_UNTAGGED
+ self._status = EV_DONE
self.retry_time = 60
self.queue_name = queue_name
diff --git a/python/pgq/remoteconsumer.py b/python/pgq/remoteconsumer.py
index f5c2ced5..cc8b73d9 100644
--- a/python/pgq/remoteconsumer.py
+++ b/python/pgq/remoteconsumer.py
@@ -29,8 +29,6 @@ class RemoteConsumer(Consumer):
curs = dst_db.cursor()
if self.is_last_batch(curs, batch_id):
- for ev in event_list:
- ev.tag_done()
return
self.process_remote_batch(db, batch_id, event_list, dst_db)
@@ -103,8 +101,6 @@ class SerialConsumer(Consumer):
# check if done
if self.is_batch_done(curs):
- for ev in event_list:
- ev.tag_done()
return
# actual work