From e24329d95a5bbf5021c7033eada7c8f86d151db9 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Fri, 6 May 2011 16:44:14 +0300 Subject: applyfunc handler --- python/londiste/handlers/__init__.py | 1 + python/londiste/handlers/applyfn.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) create mode 100644 python/londiste/handlers/applyfn.py (limited to 'python') diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py index 567c551f..b94935ed 100644 --- a/python/londiste/handlers/__init__.py +++ b/python/londiste/handlers/__init__.py @@ -4,5 +4,6 @@ DEFAULT_HANDLERS = [ 'londiste.handlers.bulk', 'londiste.handlers.qtable', 'londiste.handlers.dispatch', + 'londiste.handlers.applyfn', ] diff --git a/python/londiste/handlers/applyfn.py b/python/londiste/handlers/applyfn.py new file mode 100644 index 00000000..48414e6d --- /dev/null +++ b/python/londiste/handlers/applyfn.py @@ -0,0 +1,36 @@ +""" +Send all events to a db function. +""" + +import skytools +from londiste.handler import BaseHandler + +__all__ = ['ApplyFuncHandler'] + +class ApplyFuncHandler(BaseHandler): + """Call db function to apply event""" + handler_name = 'applyfn' + + def prepare_batch(self, batch_info, dst_curs): + self.cur_tick = batch_info['tick_id'] + + def process_event(self, ev, sql_queue_func, qfunc_arg): + """Ignore events for this table""" + fn = self.args.get('func_name') + fnconf = self.args.get('func_conf', '') + + args = [fnconf, self.cur_tick, + ev.ev_id, ev.ev_time, + ev.ev_txid, ev.ev_retry, + ev.ev_type, ev.ev_data, + ev.ev_extra1, ev.ev_extra2, + ev.ev_extra3, ev.ev_extra4] + + qfn = skytools.quote_fqident(fn) + qargs = [skytools.quote_literal(a) for a in args] + sql = "select %s(%s);" % (qfn, ', '.join(qargs)) + + sql_queue_func(sql, qfunc_arg) + +__londiste_handlers__ = [ApplyFuncHandler] + -- cgit v1.2.3