summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pgq/baseconsumer.py346
-rw-r--r--python/pgq/cascade/consumer.py24
-rw-r--r--python/pgq/consumer.py338
-rw-r--r--python/pgq/event.py42
-rw-r--r--python/pgq/localconsumer.py4
5 files changed, 406 insertions, 348 deletions
diff --git a/python/pgq/baseconsumer.py b/python/pgq/baseconsumer.py
new file mode 100644
index 00000000..7648a160
--- /dev/null
+++ b/python/pgq/baseconsumer.py
@@ -0,0 +1,346 @@
+
+"""PgQ consumer framework for Python.
+
+todo:
+ - pgq.next_batch_details()
+ - tag_done() by default
+
+"""
+
+import sys, time, skytools
+
+from pgq.event import *
+
+__all__ = ['BaseConsumer', 'BaseBatchWalker']
+
+
+class BaseBatchWalker(object):
+ """Lazy iterator over batch events.
+
+ Events are loaded using cursor. It will be given
+ as ev_list to process_batch(). It allows:
+
+ - one for loop over events
+ - len() after that
+ """
+
+ _event_class = Event
+
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
+ self.queue_name = queue_name
+ self.fetch_size = fetch_size
+ self.sql_cursor = "batch_walker"
+ self.curs = curs
+ self.length = 0
+ self.batch_id = batch_id
+ self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
+ self.consumer_filter = consumer_filter
+
+ def __iter__(self):
+ if self.fetch_status:
+ raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
+ self.fetch_status = 1
+
+ q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"
+ self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])
+ # this will return first batch of rows
+
+ q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)
+ while 1:
+ rows = self.curs.dictfetchall()
+ if not len(rows):
+ break
+
+ self.length += len(rows)
+ for row in rows:
+ ev = self._event_class(self, self.queue_name, row)
+ yield ev
+
+ # if less rows than requested, it was final block
+ if len(rows) < self.fetch_size:
+ break
+
+ # request next block of rows
+ self.curs.execute(q)
+
+ self.curs.execute("close %s" % self.sql_cursor)
+
+ self.fetch_status = 2
+
+ def __len__(self):
+ return self.length
+
+
+class BaseConsumer(skytools.DBScript):
+ """Consumer base class.
+ Do not subclass directly (use pgq.Consumer or pgq.LocalConsumer instead)
+
+ Config template::
+
+ ## Parameters for pgq.Consumer ##
+
+ # queue name to read from
+ queue_name =
+
+ # override consumer name
+ #consumer_name = %(job_name)s
+
+ # filter out only events for specific tables
+ #table_filter = table1, table2
+
+ # whether to use cursor to fetch events (0 disables)
+ #pgq_lazy_fetch = 300
+
+ # whether to read from source size in autocommmit mode
+ # not compatible with pgq_lazy_fetch
+ # the actual user script on top of pgq.Consumer must also support it
+ #pgq_autocommit = 0
+
+ # whether to wait for specified number of events,
+ # before assigning a batch (0 disables)
+ #pgq_batch_collect_events = 0
+
+ # whether to wait specified amount of time,
+ # before assigning a batch (postgres interval)
+ #pgq_batch_collect_interval =
+
+ # whether to stay behind queue top (postgres interval)
+ #pgq_keep_lag =
+
+ # in how many seconds to write keepalive stats for idle consumers
+ # this stats is used for detecting that consumer is still running
+ #keepalive_stats = 300
+ """
+
+ # by default, use cursor-based fetch
+ default_lazy_fetch = 300
+
+ # should reader connection be used in autocommit mode
+ pgq_autocommit = 0
+
+ # proper variables
+ consumer_name = None
+ queue_name = None
+
+ # compat variables
+ pgq_queue_name = None
+ pgq_consumer_id = None
+
+ pgq_lazy_fetch = None
+ pgq_min_count = None
+ pgq_min_interval = None
+ pgq_min_lag = None
+
+ batch_info = None
+
+ consumer_filter = None
+
+ keepalive_stats = None
+ # statistics: time spent waiting for events
+ idle_start = None
+
+ _batch_walker_class = BaseBatchWalker
+
+ def __init__(self, service_name, db_name, args):
+ """Initialize new consumer.
+
+ @param service_name: service_name for DBScript
+ @param db_name: name of database for get_database()
+ @param args: cmdline args for DBScript
+ """
+
+ skytools.DBScript.__init__(self, service_name, args)
+
+ self.db_name = db_name
+
+ # compat params
+ self.consumer_name = self.cf.get("pgq_consumer_id", '')
+ self.queue_name = self.cf.get("pgq_queue_name", '')
+
+ # proper params
+ if not self.consumer_name:
+ self.consumer_name = self.cf.get("consumer_name", self.job_name)
+ if not self.queue_name:
+ self.queue_name = self.cf.get("queue_name")
+
+ self.stat_batch_start = 0
+
+ # compat vars
+ self.pgq_queue_name = self.queue_name
+ self.consumer_id = self.consumer_name
+
+ # set default just once
+ self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit)
+ if self.pgq_autocommit and self.pgq_lazy_fetch:
+ raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch")
+ self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit)
+
+ self.idle_start = time.time()
+
+ def reload(self):
+ skytools.DBScript.reload(self)
+
+ self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)
+
+ # set following ones to None if not set
+ self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None
+ self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None
+ self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None
+
+ # filter out specific tables only
+ tfilt = []
+ for t in self.cf.getlist('table_filter', ''):
+ tfilt.append(skytools.quote_literal(skytools.fq_name(t)))
+ if len(tfilt) > 0:
+ expr = "ev_extra1 in (%s)" % ','.join(tfilt)
+ self.consumer_filter = expr
+
+ self.keepalive_stats = self.cf.getint("keepalive_stats", 300)
+
+ def startup(self):
+ """Handle commands here. __init__ does not have error logging."""
+ if self.options.register:
+ self.register_consumer()
+ sys.exit(0)
+ if self.options.unregister:
+ self.unregister_consumer()
+ sys.exit(0)
+ return skytools.DBScript.startup(self)
+
+ def init_optparse(self, parser = None):
+ p = skytools.DBScript.init_optparse(self, parser)
+ p.add_option('--register', action='store_true',
+ help = 'register consumer on queue')
+ p.add_option('--unregister', action='store_true',
+ help = 'unregister consumer from queue')
+ return p
+
+ def process_event(self, db, event):
+ """Process one event.
+
+ Should be overridden by user code.
+ """
+ raise Exception("needs to be implemented")
+
+ def process_batch(self, db, batch_id, event_list):
+ """Process all events in batch.
+
+ By default calls process_event for each.
+ Can be overridden by user code.
+ """
+ for ev in event_list:
+ self.process_event(db, ev)
+
+ def work(self):
+ """Do the work loop, once (internal).
+ Returns: true if wants to be called again,
+ false if script can sleep.
+ """
+
+ db = self.get_database(self.db_name)
+ curs = db.cursor()
+
+ self.stat_start()
+
+ # acquire batch
+ batch_id = self._load_next_batch(curs)
+ db.commit()
+ if batch_id == None:
+ return 0
+
+ # load events
+ ev_list = self._load_batch_events(curs, batch_id)
+ db.commit()
+
+ # process events
+ self._launch_process_batch(db, batch_id, ev_list)
+
+ # done
+ self._finish_batch(curs, batch_id, ev_list)
+ db.commit()
+ self.stat_end(len(ev_list))
+
+ return 1
+
+ def register_consumer(self):
+ self.log.info("Registering consumer on source queue")
+ db = self.get_database(self.db_name)
+ cx = db.cursor()
+ cx.execute("select pgq.register_consumer(%s, %s)",
+ [self.queue_name, self.consumer_name])
+ res = cx.fetchone()[0]
+ db.commit()
+
+ return res
+
+ def unregister_consumer(self):
+ self.log.info("Unregistering consumer from source queue")
+ db = self.get_database(self.db_name)
+ cx = db.cursor()
+ cx.execute("select pgq.unregister_consumer(%s, %s)",
+ [self.queue_name, self.consumer_name])
+ db.commit()
+
+ def _launch_process_batch(self, db, batch_id, list):
+ self.process_batch(db, batch_id, list)
+
+ def _load_batch_events_old(self, curs, batch_id):
+ """Fetch all events for this batch."""
+
+ # load events
+ sql = "select * from pgq.get_batch_events(%d)" % batch_id
+ if self.consumer_filter is not None:
+ sql += " where %s" % self.consumer_filter
+ curs.execute(sql)
+ rows = curs.dictfetchall()
+
+ # map them to python objects
+ ev_list = []
+ for r in rows:
+ ev = Event(self.queue_name, r)
+ ev_list.append(ev)
+
+ return ev_list
+
+ def _load_batch_events(self, curs, batch_id):
+ """Fetch all events for this batch."""
+
+ if self.pgq_lazy_fetch:
+ return self._batch_walker_class(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
+ else:
+ return self._load_batch_events_old(curs, batch_id)
+
+ def _load_next_batch(self, curs):
+ """Allocate next batch. (internal)"""
+
+ q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)"""
+ curs.execute(q, [self.queue_name, self.consumer_name,
+ self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval])
+ inf = curs.fetchone().copy()
+ inf['tick_id'] = inf['cur_tick_id']
+ inf['batch_end'] = inf['cur_tick_time']
+ inf['batch_start'] = inf['prev_tick_time']
+ inf['seq_start'] = inf['prev_tick_event_seq']
+ inf['seq_end'] = inf['cur_tick_event_seq']
+ self.batch_info = inf
+ return self.batch_info['batch_id']
+
+ def _finish_batch(self, curs, batch_id, list):
+ """Tag events and notify that the batch is done."""
+
+ curs.execute("select pgq.finish_batch(%s)", [batch_id])
+
+ def stat_start(self):
+ t = time.time()
+ self.stat_batch_start = t
+ if self.stat_batch_start - self.idle_start > self.keepalive_stats:
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
+ self.idle_start = t
+
+ def stat_end(self, count):
+ t = time.time()
+ self.stat_put('count', count)
+ self.stat_put('duration', round(t - self.stat_batch_start,4))
+ if count > 0: # reset timer if we got some events
+ self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
+ self.idle_start = t
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index 3d20ba78..200de338 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -6,13 +6,13 @@ Does not maintain node, but is able to pause, resume and switch provider.
import sys, time
-from pgq.consumer import Consumer
+from pgq.baseconsumer import BaseConsumer
PDB = '_provider_db'
__all__ = ['CascadedConsumer']
-class CascadedConsumer(Consumer):
+class CascadedConsumer(BaseConsumer):
"""CascadedConsumer base class.
Loads provider from target node, accepts pause/resume commands.
@@ -22,13 +22,13 @@ class CascadedConsumer(Consumer):
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
-
+
@param service_name: service_name for DBScript
@param db_name: target database name for get_database()
@param args: cmdline args for DBScript
"""
- Consumer.__init__(self, service_name, PDB, args)
+ BaseConsumer.__init__(self, service_name, PDB, args)
self.log.debug("__init__")
@@ -36,7 +36,7 @@ class CascadedConsumer(Consumer):
self.provider_connstr = None
def init_optparse(self, parser = None):
- p = Consumer.init_optparse(self, parser)
+ p = BaseConsumer.init_optparse(self, parser)
p.add_option("--provider", help = "provider location for --register")
p.add_option("--rewind", action = "store_true",
help = "change queue position according to destination")
@@ -51,7 +51,7 @@ class CascadedConsumer(Consumer):
if self.options.reset:
self.dst_reset()
sys.exit(0)
- return Consumer.startup(self)
+ return BaseConsumer.startup(self)
def register_consumer(self, provider_loc = None):
"""Register consumer on source node first, then target node."""
@@ -75,7 +75,7 @@ class CascadedConsumer(Consumer):
raise Exception('parent node not initialized?')
# source queue
- Consumer.register_consumer(self)
+ BaseConsumer.register_consumer(self)
# fetch pos
q = "select last_tick from pgq.get_consumer_info(%s, %s)"
@@ -107,7 +107,7 @@ class CascadedConsumer(Consumer):
src_db = self.get_provider_db(state)
# unregister on provider
- Consumer.unregister_consumer(self)
+ BaseConsumer.unregister_consumer(self)
# unregister on subscriber
q = "select * from pgq_node.unregister_consumer(%s, %s)"
@@ -196,7 +196,7 @@ class CascadedConsumer(Consumer):
raise Exception('provider_connstr not set')
src_db = self.get_provider_db(self._consumer_state)
- return Consumer.work(self)
+ return BaseConsumer.work(self)
def refresh_state(self, dst_db, full_logic = True):
"""Fetch consumer state from target node.
@@ -255,7 +255,7 @@ class CascadedConsumer(Consumer):
def process_remote_batch(self, src_db, tick_id, event_list, dst_db):
"""Per-batch callback.
-
+
By default just calls process_remote_event() in loop."""
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
@@ -264,7 +264,7 @@ class CascadedConsumer(Consumer):
def process_remote_event(self, src_curs, dst_curs, ev):
"""Per-event callback.
-
+
By default ignores cascading events and gives error on others.
Can be called from user handler to finish unprocessed events.
"""
@@ -290,5 +290,5 @@ class CascadedConsumer(Consumer):
except:
self.log.warning("Failure to call pgq_node.set_consumer_error()")
self.reset()
- Consumer.exception_hook(self, det, emsg)
+ BaseConsumer.exception_hook(self, det, emsg)
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index aa60ffa8..19fbf5ef 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -1,20 +1,16 @@
"""PgQ consumer framework for Python.
-todo:
- - pgq.next_batch_details()
- - tag_done() by default
-
"""
-import sys, time, skytools
-
+from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
from pgq.event import *
__all__ = ['Consumer']
-class _WalkerEvent(Event):
- """Redirects status flags to BatchWalker.
+
+class RetriableWalkerEvent(RetriableEvent):
+ """Redirects status flags to RetriableBatchWalker.
That way event data can be gc'd immediately and
tag_done() events don't need to be remembered.
@@ -25,64 +21,21 @@ class _WalkerEvent(Event):
def tag_done(self):
self._walker.tag_event_done(self)
- def tag_retry(self, retry_time = 60):
- self._walker.tag_event_retry(self, retry_time)
def get_status(self):
self._walker.get_status(self)
+ def tag_retry(self, retry_time = 60):
+ self._walker.tag_event_retry(self, retry_time)
-class _BatchWalker(object):
- """Lazy iterator over batch events.
-
- Events are loaded using cursor. It will be given
- as ev_list to process_batch(). It allows:
- - one for loop over events
- - len() after that
+class RetriableBatchWalker(BaseBatchWalker):
+ """BatchWalker that returns RetriableEvents
"""
- def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
- self.queue_name = queue_name
- self.fetch_size = fetch_size
- self.sql_cursor = "batch_walker"
- self.curs = curs
- self.length = 0
- self.status_map = {}
- self.batch_id = batch_id
- self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done
- self.consumer_filter = consumer_filter
- def __iter__(self):
- if self.fetch_status:
- raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status)
- self.fetch_status = 1
+ _event_class = RetriableWalkerEvent
- q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)"
- self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter])
- # this will return first batch of rows
-
- q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor)
- while 1:
- rows = self.curs.dictfetchall()
- if not len(rows):
- break
-
- self.length += len(rows)
- for row in rows:
- ev = _WalkerEvent(self, self.queue_name, row)
- yield ev
-
- # if less rows than requested, it was final block
- if len(rows) < self.fetch_size:
- break
-
- # request next block of rows
- self.curs.execute(q)
-
- self.curs.execute("close %s" % self.sql_cursor)
-
- self.fetch_status = 2
-
- def __len__(self):
- return self.length
+ def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
+ super(RetriableBatchWalker, self).__init__(self, curs, batch_id, queue_name, fetch_size, consumer_filter)
+ self.status_map = {}
def tag_event_done(self, event):
if event.id in self.status_map:
@@ -98,256 +51,13 @@ class _BatchWalker(object):
for res in self.status_map.iteritems():
yield res
-class Consumer(skytools.DBScript):
- """Consumer base class.
-
- Config template::
- ## Parameters for pgq.Consumer ##
-
- # queue name to read from
- queue_name =
-
- # override consumer name
- #consumer_name = %(job_name)s
-
- # filter out only events for specific tables
- #table_filter = table1, table2
-
- # whether to use cursor to fetch events (0 disables)
- #pgq_lazy_fetch = 300
-
- # whether to read from source size in autocommmit mode
- # not compatible with pgq_lazy_fetch
- # the actual user script on top of pgq.Consumer must also support it
- #pgq_autocommit = 0
-
- # whether to wait for specified number of events,
- # before assigning a batch (0 disables)
- #pgq_batch_collect_events = 0
-
- # whether to wait specified amount of time,
- # before assigning a batch (postgres interval)
- #pgq_batch_collect_interval =
-
- # whether to stay behind queue top (postgres interval)
- #pgq_keep_lag =
-
- # in how many seconds to write keepalive stats for idle consumers
- # this stats is used for detecting that consumer is still running
- #keepalive_stats = 300
+class Consumer(BaseConsumer):
+ """Normal consumer base class.
+ Can retry events
"""
- # by default, use cursor-based fetch
- default_lazy_fetch = 300
-
- # should reader connection be used in autocommit mode
- pgq_autocommit = 0
-
- # proper variables
- consumer_name = None
- queue_name = None
-
- # compat variables
- pgq_queue_name = None
- pgq_consumer_id = None
-
- pgq_lazy_fetch = None
- pgq_min_count = None
- pgq_min_interval = None
- pgq_min_lag = None
-
- batch_info = None
-
- consumer_filter = None
-
- keepalive_stats = None
- # statistics: time spent waiting for events
- idle_start = None
-
- def __init__(self, service_name, db_name, args):
- """Initialize new consumer.
-
- @param service_name: service_name for DBScript
- @param db_name: name of database for get_database()
- @param args: cmdline args for DBScript
- """
-
- skytools.DBScript.__init__(self, service_name, args)
-
- self.db_name = db_name
-
- # compat params
- self.consumer_name = self.cf.get("pgq_consumer_id", '')
- self.queue_name = self.cf.get("pgq_queue_name", '')
-
- # proper params
- if not self.consumer_name:
- self.consumer_name = self.cf.get("consumer_name", self.job_name)
- if not self.queue_name:
- self.queue_name = self.cf.get("queue_name")
-
- self.stat_batch_start = 0
-
- # compat vars
- self.pgq_queue_name = self.queue_name
- self.consumer_id = self.consumer_name
-
- # set default just once
- self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit)
- if self.pgq_autocommit and self.pgq_lazy_fetch:
- raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch")
- self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit)
-
- self.idle_start = time.time()
-
- def reload(self):
- skytools.DBScript.reload(self)
-
- self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)
-
- # set following ones to None if not set
- self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None
- self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None
- self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None
-
- # filter out specific tables only
- tfilt = []
- for t in self.cf.getlist('table_filter', ''):
- tfilt.append(skytools.quote_literal(skytools.fq_name(t)))
- if len(tfilt) > 0:
- expr = "ev_extra1 in (%s)" % ','.join(tfilt)
- self.consumer_filter = expr
-
- self.keepalive_stats = self.cf.getint("keepalive_stats", 300)
-
- def startup(self):
- """Handle commands here. __init__ does not have error logging."""
- if self.options.register:
- self.register_consumer()
- sys.exit(0)
- if self.options.unregister:
- self.unregister_consumer()
- sys.exit(0)
- return skytools.DBScript.startup(self)
-
- def init_optparse(self, parser = None):
- p = skytools.DBScript.init_optparse(self, parser)
- p.add_option('--register', action='store_true',
- help = 'register consumer on queue')
- p.add_option('--unregister', action='store_true',
- help = 'unregister consumer from queue')
- return p
-
- def process_event(self, db, event):
- """Process one event.
-
- Should be overridden by user code.
- """
- raise Exception("needs to be implemented")
-
- def process_batch(self, db, batch_id, event_list):
- """Process all events in batch.
-
- By default calls process_event for each.
- Can be overridden by user code.
- """
- for ev in event_list:
- self.process_event(db, ev)
-
- def work(self):
- """Do the work loop, once (internal).
- Returns: true if wants to be called again,
- false if script can sleep.
- """
-
- db = self.get_database(self.db_name)
- curs = db.cursor()
-
- self.stat_start()
-
- # acquire batch
- batch_id = self._load_next_batch(curs)
- db.commit()
- if batch_id == None:
- return 0
-
- # load events
- ev_list = self._load_batch_events(curs, batch_id)
- db.commit()
-
- # process events
- self._launch_process_batch(db, batch_id, ev_list)
-
- # done
- self._finish_batch(curs, batch_id, ev_list)
- db.commit()
- self.stat_end(len(ev_list))
-
- return 1
-
- def register_consumer(self):
- self.log.info("Registering consumer on source queue")
- db = self.get_database(self.db_name)
- cx = db.cursor()
- cx.execute("select pgq.register_consumer(%s, %s)",
- [self.queue_name, self.consumer_name])
- res = cx.fetchone()[0]
- db.commit()
-
- return res
-
- def unregister_consumer(self):
- self.log.info("Unregistering consumer from source queue")
- db = self.get_database(self.db_name)
- cx = db.cursor()
- cx.execute("select pgq.unregister_consumer(%s, %s)",
- [self.queue_name, self.consumer_name])
- db.commit()
-
- def _launch_process_batch(self, db, batch_id, list):
- self.process_batch(db, batch_id, list)
-
- def _load_batch_events_old(self, curs, batch_id):
- """Fetch all events for this batch."""
-
- # load events
- sql = "select * from pgq.get_batch_events(%d)" % batch_id
- if self.consumer_filter is not None:
- sql += " where %s" % self.consumer_filter
- curs.execute(sql)
- rows = curs.dictfetchall()
-
- # map them to python objects
- ev_list = []
- for r in rows:
- ev = Event(self.queue_name, r)
- ev_list.append(ev)
-
- return ev_list
-
- def _load_batch_events(self, curs, batch_id):
- """Fetch all events for this batch."""
-
- if self.pgq_lazy_fetch:
- return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter)
- else:
- return self._load_batch_events_old(curs, batch_id)
-
- def _load_next_batch(self, curs):
- """Allocate next batch. (internal)"""
-
- q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)"""
- curs.execute(q, [self.queue_name, self.consumer_name,
- self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval])
- inf = curs.fetchone().copy()
- inf['tick_id'] = inf['cur_tick_id']
- inf['batch_end'] = inf['cur_tick_time']
- inf['batch_start'] = inf['prev_tick_time']
- inf['seq_start'] = inf['prev_tick_event_seq']
- inf['seq_end'] = inf['cur_tick_event_seq']
- self.batch_info = inf
- return self.batch_info['batch_id']
+ _batch_walker_class = RetriableBatchWalker
def _flush_retry(self, curs, batch_id, list):
"""Tag retry events."""
@@ -378,24 +88,10 @@ class Consumer(skytools.DBScript):
self._flush_retry(curs, batch_id, list)
- curs.execute("select pgq.finish_batch(%s)", [batch_id])
+ super(Consumer, self)._finish_batch(self, curs, batch_id, list)
def _tag_retry(self, cx, batch_id, ev_id, retry_time):
"""Tag event for retry. (internal)"""
cx.execute("select pgq.event_retry(%s, %s, %s)",
[batch_id, ev_id, retry_time])
- def stat_start(self):
- t = time.time()
- self.stat_batch_start = t
- if self.stat_batch_start - self.idle_start > self.keepalive_stats:
- self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
- self.idle_start = t
-
- def stat_end(self, count):
- t = time.time()
- self.stat_put('count', count)
- self.stat_put('duration', round(t - self.stat_batch_start,4))
- if count > 0: # reset timer if we got some events
- self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4))
- self.idle_start = t
diff --git a/python/pgq/event.py b/python/pgq/event.py
index 39735507..ddd550e4 100644
--- a/python/pgq/event.py
+++ b/python/pgq/event.py
@@ -2,7 +2,7 @@
"""PgQ event container.
"""
-__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event']
+__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent']
# Event status codes
EV_UNTAGGED = -1
@@ -35,32 +35,26 @@ _fldmap = {
class Event(object):
"""Event data for consumers.
-
- Consumer is supposed to tag them after processing.
- If not, events will stay in retry queue.
+
+ Will be removed from the queue by default.
"""
- __slots__ = ('_event_row', '_status', 'retry_time',
+ __slots__ = ('_event_row', 'retry_time',
'queue_name')
def __init__(self, queue_name, row):
self._event_row = row
- self._status = EV_DONE
self.retry_time = 60
self.queue_name = queue_name
def __getattr__(self, key):
return self._event_row[_fldmap[key]]
+ # would be better in RetriableEvent only since we don't care but
+ # unfortunatelly it needs to be defined here due to compatibility concerns
def tag_done(self):
+ pass
self._status = EV_DONE
- def tag_retry(self, retry_time = 60):
- self._status = EV_RETRY
- self.retry_time = retry_time
-
- def get_status(self):
- return self._status
-
# be also dict-like
def __getitem__(self, k): return self._event_row.__getitem__(k)
def __contains__(self, k): return self._event_row.__contains__(k)
@@ -74,3 +68,25 @@ class Event(object):
def __str__(self):
return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % (
self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4)
+
+class RetriableEvent(Event):
+ """Event which can be retryed
+
+ Consumer is supposed to tag them after processing.
+ """
+
+ __slots__ = Event.__slots__ + ('_status', )
+
+ def __init__(self, queue_name, row):
+ super(RetriableEvent, self).__init__(self, queue_name, row)
+ self._status = EV_DONE
+
+ def tag_done(self):
+ self._status = EV_DONE
+
+ def get_status(self):
+ return self._status
+
+ def tag_retry(self, retry_time = 60):
+ self._status = EV_RETRY
+ self.retry_time = retry_time
diff --git a/python/pgq/localconsumer.py b/python/pgq/localconsumer.py
index 5ba74453..ed791ece 100644
--- a/python/pgq/localconsumer.py
+++ b/python/pgq/localconsumer.py
@@ -13,11 +13,11 @@ import sys
import os
import errno
import skytools
-import pgq
+from pgq.baseconsumer import BaseConsumer
__all__ = ['LocalConsumer']
-class LocalConsumer(pgq.Consumer):
+class LocalConsumer(BaseConsumer):
"""Consumer that applies batches sequentially in second database.
Requirements: