diff options
author | Marko Kreen | 2012-05-10 15:55:50 +0000 |
---|---|---|
committer | Marko Kreen | 2012-05-10 18:19:29 +0000 |
commit | 525e8fda8d2c89166e034569b77d3aeb78eaea4c (patch) | |
tree | 06b0190d28e0a98e41d1b7f89a3c2466f1b03c74 | |
parent | 491f2aef75f9d044c5f5274a46a733a0ab24fbe9 (diff) |
Include simple_consumer script
Quick event processing by simply launching SQL
query for each event.
Forward-ported-by: Juta Vaks
-rwxr-xr-x | scripts/simple_consumer.py | 53 | ||||
-rwxr-xr-x | setup_skytools.py | 1 |
2 files changed, 54 insertions, 0 deletions
diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py new file mode 100755 index 00000000..109f27a5 --- /dev/null +++ b/scripts/simple_consumer.py @@ -0,0 +1,53 @@ +#!/usr/bin/python + +"""Consumer that simply calls SQL query for each event. + +Config:: + # query to call + dst_query = select * from somefunc(%%(pgq.ev_data)s); + + # filter for events (SQL fragment) + consumer_filter = ev_extra1 = 'public.mytable1' +""" + + +import sys + +import pkgloader +pkgloader.require('skytools', '3.0') + +import pgq +import skytools + +class SimpleConsumer(pgq.Consumer): + __doc__ = __doc__ + + def __init__(self, args): + pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args) + self.dst_query = self.cf.get("dst_query") + self.consumer_filter = self.cf.get("consumer_filter", "") + + def process_event(self, db, ev): + curs = self.get_database('dst_db', autocommit = 1).cursor() + + if ev.ev_type[:2] not in ('I:', 'U:', 'D:'): + return + + if ev.ev_data is None: + payload = {} + else: + payload = skytools.db_urldecode(ev.ev_data) + payload['pgq.ev_data'] = ev.ev_data + payload['pgq.ev_type'] = ev.ev_type + payload['pgq.ev_extra1'] = ev.ev_extra1 + payload['pgq.ev_time'] = ev.ev_time + + self.log.debug(self.dst_query % payload) + curs.execute(self.dst_query, payload) + res = curs.fetchall() + self.log.debug(res) + +if __name__ == '__main__': + script = SimpleConsumer(sys.argv[1:]) + script.start() + diff --git a/setup_skytools.py b/setup_skytools.py index 648b9aac..0e4a65c8 100755 --- a/setup_skytools.py +++ b/setup_skytools.py @@ -37,6 +37,7 @@ sfx_scripts = [ 'scripts/scriptmgr.py', 'scripts/queue_splitter.py', 'scripts/queue_mover.py', + 'scripts/simple_consumer.py', 'scripts/skytools_upgrade.py', ] # those do not need suffix (no conflict with 2.1) |