diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/pgq/baseconsumer.py | 346 | ||||
-rw-r--r-- | python/pgq/cascade/consumer.py | 24 | ||||
-rw-r--r-- | python/pgq/consumer.py | 338 | ||||
-rw-r--r-- | python/pgq/event.py | 42 | ||||
-rw-r--r-- | python/pgq/localconsumer.py | 4 |
5 files changed, 406 insertions, 348 deletions
diff --git a/python/pgq/baseconsumer.py b/python/pgq/baseconsumer.py new file mode 100644 index 00000000..7648a160 --- /dev/null +++ b/python/pgq/baseconsumer.py @@ -0,0 +1,346 @@ + +"""PgQ consumer framework for Python. + +todo: + - pgq.next_batch_details() + - tag_done() by default + +""" + +import sys, time, skytools + +from pgq.event import * + +__all__ = ['BaseConsumer', 'BaseBatchWalker'] + + +class BaseBatchWalker(object): + """Lazy iterator over batch events. + + Events are loaded using cursor. It will be given + as ev_list to process_batch(). It allows: + + - one for loop over events + - len() after that + """ + + _event_class = Event + + def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): + self.queue_name = queue_name + self.fetch_size = fetch_size + self.sql_cursor = "batch_walker" + self.curs = curs + self.length = 0 + self.batch_id = batch_id + self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done + self.consumer_filter = consumer_filter + + def __iter__(self): + if self.fetch_status: + raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) + self.fetch_status = 1 + + q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)" + self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter]) + # this will return first batch of rows + + q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor) + while 1: + rows = self.curs.dictfetchall() + if not len(rows): + break + + self.length += len(rows) + for row in rows: + ev = self._event_class(self, self.queue_name, row) + yield ev + + # if less rows than requested, it was final block + if len(rows) < self.fetch_size: + break + + # request next block of rows + self.curs.execute(q) + + self.curs.execute("close %s" % self.sql_cursor) + + self.fetch_status = 2 + + def __len__(self): + return self.length + + +class BaseConsumer(skytools.DBScript): + """Consumer base class. + Do not subclass directly (use pgq.Consumer or pgq.LocalConsumer instead) + + Config template:: + + ## Parameters for pgq.Consumer ## + + # queue name to read from + queue_name = + + # override consumer name + #consumer_name = %(job_name)s + + # filter out only events for specific tables + #table_filter = table1, table2 + + # whether to use cursor to fetch events (0 disables) + #pgq_lazy_fetch = 300 + + # whether to read from source size in autocommmit mode + # not compatible with pgq_lazy_fetch + # the actual user script on top of pgq.Consumer must also support it + #pgq_autocommit = 0 + + # whether to wait for specified number of events, + # before assigning a batch (0 disables) + #pgq_batch_collect_events = 0 + + # whether to wait specified amount of time, + # before assigning a batch (postgres interval) + #pgq_batch_collect_interval = + + # whether to stay behind queue top (postgres interval) + #pgq_keep_lag = + + # in how many seconds to write keepalive stats for idle consumers + # this stats is used for detecting that consumer is still running + #keepalive_stats = 300 + """ + + # by default, use cursor-based fetch + default_lazy_fetch = 300 + + # should reader connection be used in autocommit mode + pgq_autocommit = 0 + + # proper variables + consumer_name = None + queue_name = None + + # compat variables + pgq_queue_name = None + pgq_consumer_id = None + + pgq_lazy_fetch = None + pgq_min_count = None + pgq_min_interval = None + pgq_min_lag = None + + batch_info = None + + consumer_filter = None + + keepalive_stats = None + # statistics: time spent waiting for events + idle_start = None + + _batch_walker_class = BaseBatchWalker + + def __init__(self, service_name, db_name, args): + """Initialize new consumer. + + @param service_name: service_name for DBScript + @param db_name: name of database for get_database() + @param args: cmdline args for DBScript + """ + + skytools.DBScript.__init__(self, service_name, args) + + self.db_name = db_name + + # compat params + self.consumer_name = self.cf.get("pgq_consumer_id", '') + self.queue_name = self.cf.get("pgq_queue_name", '') + + # proper params + if not self.consumer_name: + self.consumer_name = self.cf.get("consumer_name", self.job_name) + if not self.queue_name: + self.queue_name = self.cf.get("queue_name") + + self.stat_batch_start = 0 + + # compat vars + self.pgq_queue_name = self.queue_name + self.consumer_id = self.consumer_name + + # set default just once + self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit) + if self.pgq_autocommit and self.pgq_lazy_fetch: + raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch") + self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit) + + self.idle_start = time.time() + + def reload(self): + skytools.DBScript.reload(self) + + self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch) + + # set following ones to None if not set + self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None + self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None + self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None + + # filter out specific tables only + tfilt = [] + for t in self.cf.getlist('table_filter', ''): + tfilt.append(skytools.quote_literal(skytools.fq_name(t))) + if len(tfilt) > 0: + expr = "ev_extra1 in (%s)" % ','.join(tfilt) + self.consumer_filter = expr + + self.keepalive_stats = self.cf.getint("keepalive_stats", 300) + + def startup(self): + """Handle commands here. __init__ does not have error logging.""" + if self.options.register: + self.register_consumer() + sys.exit(0) + if self.options.unregister: + self.unregister_consumer() + sys.exit(0) + return skytools.DBScript.startup(self) + + def init_optparse(self, parser = None): + p = skytools.DBScript.init_optparse(self, parser) + p.add_option('--register', action='store_true', + help = 'register consumer on queue') + p.add_option('--unregister', action='store_true', + help = 'unregister consumer from queue') + return p + + def process_event(self, db, event): + """Process one event. + + Should be overridden by user code. + """ + raise Exception("needs to be implemented") + + def process_batch(self, db, batch_id, event_list): + """Process all events in batch. + + By default calls process_event for each. + Can be overridden by user code. + """ + for ev in event_list: + self.process_event(db, ev) + + def work(self): + """Do the work loop, once (internal). + Returns: true if wants to be called again, + false if script can sleep. + """ + + db = self.get_database(self.db_name) + curs = db.cursor() + + self.stat_start() + + # acquire batch + batch_id = self._load_next_batch(curs) + db.commit() + if batch_id == None: + return 0 + + # load events + ev_list = self._load_batch_events(curs, batch_id) + db.commit() + + # process events + self._launch_process_batch(db, batch_id, ev_list) + + # done + self._finish_batch(curs, batch_id, ev_list) + db.commit() + self.stat_end(len(ev_list)) + + return 1 + + def register_consumer(self): + self.log.info("Registering consumer on source queue") + db = self.get_database(self.db_name) + cx = db.cursor() + cx.execute("select pgq.register_consumer(%s, %s)", + [self.queue_name, self.consumer_name]) + res = cx.fetchone()[0] + db.commit() + + return res + + def unregister_consumer(self): + self.log.info("Unregistering consumer from source queue") + db = self.get_database(self.db_name) + cx = db.cursor() + cx.execute("select pgq.unregister_consumer(%s, %s)", + [self.queue_name, self.consumer_name]) + db.commit() + + def _launch_process_batch(self, db, batch_id, list): + self.process_batch(db, batch_id, list) + + def _load_batch_events_old(self, curs, batch_id): + """Fetch all events for this batch.""" + + # load events + sql = "select * from pgq.get_batch_events(%d)" % batch_id + if self.consumer_filter is not None: + sql += " where %s" % self.consumer_filter + curs.execute(sql) + rows = curs.dictfetchall() + + # map them to python objects + ev_list = [] + for r in rows: + ev = Event(self.queue_name, r) + ev_list.append(ev) + + return ev_list + + def _load_batch_events(self, curs, batch_id): + """Fetch all events for this batch.""" + + if self.pgq_lazy_fetch: + return self._batch_walker_class(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter) + else: + return self._load_batch_events_old(curs, batch_id) + + def _load_next_batch(self, curs): + """Allocate next batch. (internal)""" + + q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)""" + curs.execute(q, [self.queue_name, self.consumer_name, + self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval]) + inf = curs.fetchone().copy() + inf['tick_id'] = inf['cur_tick_id'] + inf['batch_end'] = inf['cur_tick_time'] + inf['batch_start'] = inf['prev_tick_time'] + inf['seq_start'] = inf['prev_tick_event_seq'] + inf['seq_end'] = inf['cur_tick_event_seq'] + self.batch_info = inf + return self.batch_info['batch_id'] + + def _finish_batch(self, curs, batch_id, list): + """Tag events and notify that the batch is done.""" + + curs.execute("select pgq.finish_batch(%s)", [batch_id]) + + def stat_start(self): + t = time.time() + self.stat_batch_start = t + if self.stat_batch_start - self.idle_start > self.keepalive_stats: + self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) + self.idle_start = t + + def stat_end(self, count): + t = time.time() + self.stat_put('count', count) + self.stat_put('duration', round(t - self.stat_batch_start,4)) + if count > 0: # reset timer if we got some events + self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) + self.idle_start = t diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index 3d20ba78..200de338 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -6,13 +6,13 @@ Does not maintain node, but is able to pause, resume and switch provider. import sys, time -from pgq.consumer import Consumer +from pgq.baseconsumer import BaseConsumer PDB = '_provider_db' __all__ = ['CascadedConsumer'] -class CascadedConsumer(Consumer): +class CascadedConsumer(BaseConsumer): """CascadedConsumer base class. Loads provider from target node, accepts pause/resume commands. @@ -22,13 +22,13 @@ class CascadedConsumer(Consumer): def __init__(self, service_name, db_name, args): """Initialize new consumer. - + @param service_name: service_name for DBScript @param db_name: target database name for get_database() @param args: cmdline args for DBScript """ - Consumer.__init__(self, service_name, PDB, args) + BaseConsumer.__init__(self, service_name, PDB, args) self.log.debug("__init__") @@ -36,7 +36,7 @@ class CascadedConsumer(Consumer): self.provider_connstr = None def init_optparse(self, parser = None): - p = Consumer.init_optparse(self, parser) + p = BaseConsumer.init_optparse(self, parser) p.add_option("--provider", help = "provider location for --register") p.add_option("--rewind", action = "store_true", help = "change queue position according to destination") @@ -51,7 +51,7 @@ class CascadedConsumer(Consumer): if self.options.reset: self.dst_reset() sys.exit(0) - return Consumer.startup(self) + return BaseConsumer.startup(self) def register_consumer(self, provider_loc = None): """Register consumer on source node first, then target node.""" @@ -75,7 +75,7 @@ class CascadedConsumer(Consumer): raise Exception('parent node not initialized?') # source queue - Consumer.register_consumer(self) + BaseConsumer.register_consumer(self) # fetch pos q = "select last_tick from pgq.get_consumer_info(%s, %s)" @@ -107,7 +107,7 @@ class CascadedConsumer(Consumer): src_db = self.get_provider_db(state) # unregister on provider - Consumer.unregister_consumer(self) + BaseConsumer.unregister_consumer(self) # unregister on subscriber q = "select * from pgq_node.unregister_consumer(%s, %s)" @@ -196,7 +196,7 @@ class CascadedConsumer(Consumer): raise Exception('provider_connstr not set') src_db = self.get_provider_db(self._consumer_state) - return Consumer.work(self) + return BaseConsumer.work(self) def refresh_state(self, dst_db, full_logic = True): """Fetch consumer state from target node. @@ -255,7 +255,7 @@ class CascadedConsumer(Consumer): def process_remote_batch(self, src_db, tick_id, event_list, dst_db): """Per-batch callback. - + By default just calls process_remote_event() in loop.""" src_curs = src_db.cursor() dst_curs = dst_db.cursor() @@ -264,7 +264,7 @@ class CascadedConsumer(Consumer): def process_remote_event(self, src_curs, dst_curs, ev): """Per-event callback. - + By default ignores cascading events and gives error on others. Can be called from user handler to finish unprocessed events. """ @@ -290,5 +290,5 @@ class CascadedConsumer(Consumer): except: self.log.warning("Failure to call pgq_node.set_consumer_error()") self.reset() - Consumer.exception_hook(self, det, emsg) + BaseConsumer.exception_hook(self, det, emsg) diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index aa60ffa8..19fbf5ef 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -1,20 +1,16 @@ """PgQ consumer framework for Python. -todo: - - pgq.next_batch_details() - - tag_done() by default - """ -import sys, time, skytools - +from pgq.baseconsumer import BaseConsumer, BaseBatchWalker from pgq.event import * __all__ = ['Consumer'] -class _WalkerEvent(Event): - """Redirects status flags to BatchWalker. + +class RetriableWalkerEvent(RetriableEvent): + """Redirects status flags to RetriableBatchWalker. That way event data can be gc'd immediately and tag_done() events don't need to be remembered. @@ -25,64 +21,21 @@ class _WalkerEvent(Event): def tag_done(self): self._walker.tag_event_done(self) - def tag_retry(self, retry_time = 60): - self._walker.tag_event_retry(self, retry_time) def get_status(self): self._walker.get_status(self) + def tag_retry(self, retry_time = 60): + self._walker.tag_event_retry(self, retry_time) -class _BatchWalker(object): - """Lazy iterator over batch events. - - Events are loaded using cursor. It will be given - as ev_list to process_batch(). It allows: - - one for loop over events - - len() after that +class RetriableBatchWalker(BaseBatchWalker): + """BatchWalker that returns RetriableEvents """ - def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): - self.queue_name = queue_name - self.fetch_size = fetch_size - self.sql_cursor = "batch_walker" - self.curs = curs - self.length = 0 - self.status_map = {} - self.batch_id = batch_id - self.fetch_status = 0 # 0-not started, 1-in-progress, 2-done - self.consumer_filter = consumer_filter - def __iter__(self): - if self.fetch_status: - raise Exception("BatchWalker: double fetch? (%d)" % self.fetch_status) - self.fetch_status = 1 + _event_class = RetriableWalkerEvent - q = "select * from pgq.get_batch_cursor(%s, %s, %s, %s)" - self.curs.execute(q, [self.batch_id, self.sql_cursor, self.fetch_size, self.consumer_filter]) - # this will return first batch of rows - - q = "fetch %d from %s" % (self.fetch_size, self.sql_cursor) - while 1: - rows = self.curs.dictfetchall() - if not len(rows): - break - - self.length += len(rows) - for row in rows: - ev = _WalkerEvent(self, self.queue_name, row) - yield ev - - # if less rows than requested, it was final block - if len(rows) < self.fetch_size: - break - - # request next block of rows - self.curs.execute(q) - - self.curs.execute("close %s" % self.sql_cursor) - - self.fetch_status = 2 - - def __len__(self): - return self.length + def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None): + super(RetriableBatchWalker, self).__init__(self, curs, batch_id, queue_name, fetch_size, consumer_filter) + self.status_map = {} def tag_event_done(self, event): if event.id in self.status_map: @@ -98,256 +51,13 @@ class _BatchWalker(object): for res in self.status_map.iteritems(): yield res -class Consumer(skytools.DBScript): - """Consumer base class. - - Config template:: - ## Parameters for pgq.Consumer ## - - # queue name to read from - queue_name = - - # override consumer name - #consumer_name = %(job_name)s - - # filter out only events for specific tables - #table_filter = table1, table2 - - # whether to use cursor to fetch events (0 disables) - #pgq_lazy_fetch = 300 - - # whether to read from source size in autocommmit mode - # not compatible with pgq_lazy_fetch - # the actual user script on top of pgq.Consumer must also support it - #pgq_autocommit = 0 - - # whether to wait for specified number of events, - # before assigning a batch (0 disables) - #pgq_batch_collect_events = 0 - - # whether to wait specified amount of time, - # before assigning a batch (postgres interval) - #pgq_batch_collect_interval = - - # whether to stay behind queue top (postgres interval) - #pgq_keep_lag = - - # in how many seconds to write keepalive stats for idle consumers - # this stats is used for detecting that consumer is still running - #keepalive_stats = 300 +class Consumer(BaseConsumer): + """Normal consumer base class. + Can retry events """ - # by default, use cursor-based fetch - default_lazy_fetch = 300 - - # should reader connection be used in autocommit mode - pgq_autocommit = 0 - - # proper variables - consumer_name = None - queue_name = None - - # compat variables - pgq_queue_name = None - pgq_consumer_id = None - - pgq_lazy_fetch = None - pgq_min_count = None - pgq_min_interval = None - pgq_min_lag = None - - batch_info = None - - consumer_filter = None - - keepalive_stats = None - # statistics: time spent waiting for events - idle_start = None - - def __init__(self, service_name, db_name, args): - """Initialize new consumer. - - @param service_name: service_name for DBScript - @param db_name: name of database for get_database() - @param args: cmdline args for DBScript - """ - - skytools.DBScript.__init__(self, service_name, args) - - self.db_name = db_name - - # compat params - self.consumer_name = self.cf.get("pgq_consumer_id", '') - self.queue_name = self.cf.get("pgq_queue_name", '') - - # proper params - if not self.consumer_name: - self.consumer_name = self.cf.get("consumer_name", self.job_name) - if not self.queue_name: - self.queue_name = self.cf.get("queue_name") - - self.stat_batch_start = 0 - - # compat vars - self.pgq_queue_name = self.queue_name - self.consumer_id = self.consumer_name - - # set default just once - self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit) - if self.pgq_autocommit and self.pgq_lazy_fetch: - raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch") - self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit) - - self.idle_start = time.time() - - def reload(self): - skytools.DBScript.reload(self) - - self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch) - - # set following ones to None if not set - self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None - self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None - self.pgq_min_lag = self.cf.get("pgq_keep_lag", '') or None - - # filter out specific tables only - tfilt = [] - for t in self.cf.getlist('table_filter', ''): - tfilt.append(skytools.quote_literal(skytools.fq_name(t))) - if len(tfilt) > 0: - expr = "ev_extra1 in (%s)" % ','.join(tfilt) - self.consumer_filter = expr - - self.keepalive_stats = self.cf.getint("keepalive_stats", 300) - - def startup(self): - """Handle commands here. __init__ does not have error logging.""" - if self.options.register: - self.register_consumer() - sys.exit(0) - if self.options.unregister: - self.unregister_consumer() - sys.exit(0) - return skytools.DBScript.startup(self) - - def init_optparse(self, parser = None): - p = skytools.DBScript.init_optparse(self, parser) - p.add_option('--register', action='store_true', - help = 'register consumer on queue') - p.add_option('--unregister', action='store_true', - help = 'unregister consumer from queue') - return p - - def process_event(self, db, event): - """Process one event. - - Should be overridden by user code. - """ - raise Exception("needs to be implemented") - - def process_batch(self, db, batch_id, event_list): - """Process all events in batch. - - By default calls process_event for each. - Can be overridden by user code. - """ - for ev in event_list: - self.process_event(db, ev) - - def work(self): - """Do the work loop, once (internal). - Returns: true if wants to be called again, - false if script can sleep. - """ - - db = self.get_database(self.db_name) - curs = db.cursor() - - self.stat_start() - - # acquire batch - batch_id = self._load_next_batch(curs) - db.commit() - if batch_id == None: - return 0 - - # load events - ev_list = self._load_batch_events(curs, batch_id) - db.commit() - - # process events - self._launch_process_batch(db, batch_id, ev_list) - - # done - self._finish_batch(curs, batch_id, ev_list) - db.commit() - self.stat_end(len(ev_list)) - - return 1 - - def register_consumer(self): - self.log.info("Registering consumer on source queue") - db = self.get_database(self.db_name) - cx = db.cursor() - cx.execute("select pgq.register_consumer(%s, %s)", - [self.queue_name, self.consumer_name]) - res = cx.fetchone()[0] - db.commit() - - return res - - def unregister_consumer(self): - self.log.info("Unregistering consumer from source queue") - db = self.get_database(self.db_name) - cx = db.cursor() - cx.execute("select pgq.unregister_consumer(%s, %s)", - [self.queue_name, self.consumer_name]) - db.commit() - - def _launch_process_batch(self, db, batch_id, list): - self.process_batch(db, batch_id, list) - - def _load_batch_events_old(self, curs, batch_id): - """Fetch all events for this batch.""" - - # load events - sql = "select * from pgq.get_batch_events(%d)" % batch_id - if self.consumer_filter is not None: - sql += " where %s" % self.consumer_filter - curs.execute(sql) - rows = curs.dictfetchall() - - # map them to python objects - ev_list = [] - for r in rows: - ev = Event(self.queue_name, r) - ev_list.append(ev) - - return ev_list - - def _load_batch_events(self, curs, batch_id): - """Fetch all events for this batch.""" - - if self.pgq_lazy_fetch: - return _BatchWalker(curs, batch_id, self.queue_name, self.pgq_lazy_fetch, self.consumer_filter) - else: - return self._load_batch_events_old(curs, batch_id) - - def _load_next_batch(self, curs): - """Allocate next batch. (internal)""" - - q = """select * from pgq.next_batch_custom(%s, %s, %s, %s, %s)""" - curs.execute(q, [self.queue_name, self.consumer_name, - self.pgq_min_lag, self.pgq_min_count, self.pgq_min_interval]) - inf = curs.fetchone().copy() - inf['tick_id'] = inf['cur_tick_id'] - inf['batch_end'] = inf['cur_tick_time'] - inf['batch_start'] = inf['prev_tick_time'] - inf['seq_start'] = inf['prev_tick_event_seq'] - inf['seq_end'] = inf['cur_tick_event_seq'] - self.batch_info = inf - return self.batch_info['batch_id'] + _batch_walker_class = RetriableBatchWalker def _flush_retry(self, curs, batch_id, list): """Tag retry events.""" @@ -378,24 +88,10 @@ class Consumer(skytools.DBScript): self._flush_retry(curs, batch_id, list) - curs.execute("select pgq.finish_batch(%s)", [batch_id]) + super(Consumer, self)._finish_batch(self, curs, batch_id, list) def _tag_retry(self, cx, batch_id, ev_id, retry_time): """Tag event for retry. (internal)""" cx.execute("select pgq.event_retry(%s, %s, %s)", [batch_id, ev_id, retry_time]) - def stat_start(self): - t = time.time() - self.stat_batch_start = t - if self.stat_batch_start - self.idle_start > self.keepalive_stats: - self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) - self.idle_start = t - - def stat_end(self, count): - t = time.time() - self.stat_put('count', count) - self.stat_put('duration', round(t - self.stat_batch_start,4)) - if count > 0: # reset timer if we got some events - self.stat_put('idle', round(self.stat_batch_start - self.idle_start,4)) - self.idle_start = t diff --git a/python/pgq/event.py b/python/pgq/event.py index 39735507..ddd550e4 100644 --- a/python/pgq/event.py +++ b/python/pgq/event.py @@ -2,7 +2,7 @@ """PgQ event container. """ -__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event'] +__all__ = ['EV_UNTAGGED', 'EV_RETRY', 'EV_DONE', 'Event', 'RetriableEvent'] # Event status codes EV_UNTAGGED = -1 @@ -35,32 +35,26 @@ _fldmap = { class Event(object): """Event data for consumers. - - Consumer is supposed to tag them after processing. - If not, events will stay in retry queue. + + Will be removed from the queue by default. """ - __slots__ = ('_event_row', '_status', 'retry_time', + __slots__ = ('_event_row', 'retry_time', 'queue_name') def __init__(self, queue_name, row): self._event_row = row - self._status = EV_DONE self.retry_time = 60 self.queue_name = queue_name def __getattr__(self, key): return self._event_row[_fldmap[key]] + # would be better in RetriableEvent only since we don't care but + # unfortunatelly it needs to be defined here due to compatibility concerns def tag_done(self): + pass self._status = EV_DONE - def tag_retry(self, retry_time = 60): - self._status = EV_RETRY - self.retry_time = retry_time - - def get_status(self): - return self._status - # be also dict-like def __getitem__(self, k): return self._event_row.__getitem__(k) def __contains__(self, k): return self._event_row.__contains__(k) @@ -74,3 +68,25 @@ class Event(object): def __str__(self): return "<id=%d type=%s data=%s e1=%s e2=%s e3=%s e4=%s>" % ( self.id, self.type, self.data, self.extra1, self.extra2, self.extra3, self.extra4) + +class RetriableEvent(Event): + """Event which can be retryed + + Consumer is supposed to tag them after processing. + """ + + __slots__ = Event.__slots__ + ('_status', ) + + def __init__(self, queue_name, row): + super(RetriableEvent, self).__init__(self, queue_name, row) + self._status = EV_DONE + + def tag_done(self): + self._status = EV_DONE + + def get_status(self): + return self._status + + def tag_retry(self, retry_time = 60): + self._status = EV_RETRY + self.retry_time = retry_time diff --git a/python/pgq/localconsumer.py b/python/pgq/localconsumer.py index 5ba74453..ed791ece 100644 --- a/python/pgq/localconsumer.py +++ b/python/pgq/localconsumer.py @@ -13,11 +13,11 @@ import sys import os import errno import skytools -import pgq +from pgq.baseconsumer import BaseConsumer __all__ = ['LocalConsumer'] -class LocalConsumer(pgq.Consumer): +class LocalConsumer(BaseConsumer): """Consumer that applies batches sequentially in second database. Requirements: |