summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Kreen2012-05-10 15:55:50 +0000
committerMarko Kreen2012-05-10 18:19:29 +0000
commit525e8fda8d2c89166e034569b77d3aeb78eaea4c (patch)
tree06b0190d28e0a98e41d1b7f89a3c2466f1b03c74
parent491f2aef75f9d044c5f5274a46a733a0ab24fbe9 (diff)
Include simple_consumer script
Quick event processing by simply launching SQL query for each event. Forward-ported-by: Juta Vaks
-rwxr-xr-xscripts/simple_consumer.py53
-rwxr-xr-xsetup_skytools.py1
2 files changed, 54 insertions, 0 deletions
diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py
new file mode 100755
index 00000000..109f27a5
--- /dev/null
+++ b/scripts/simple_consumer.py
@@ -0,0 +1,53 @@
+#!/usr/bin/python
+
+"""Consumer that simply calls SQL query for each event.
+
+Config::
+ # query to call
+ dst_query = select * from somefunc(%%(pgq.ev_data)s);
+
+ # filter for events (SQL fragment)
+ consumer_filter = ev_extra1 = 'public.mytable1'
+"""
+
+
+import sys
+
+import pkgloader
+pkgloader.require('skytools', '3.0')
+
+import pgq
+import skytools
+
+class SimpleConsumer(pgq.Consumer):
+ __doc__ = __doc__
+
+ def __init__(self, args):
+ pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args)
+ self.dst_query = self.cf.get("dst_query")
+ self.consumer_filter = self.cf.get("consumer_filter", "")
+
+ def process_event(self, db, ev):
+ curs = self.get_database('dst_db', autocommit = 1).cursor()
+
+ if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):
+ return
+
+ if ev.ev_data is None:
+ payload = {}
+ else:
+ payload = skytools.db_urldecode(ev.ev_data)
+ payload['pgq.ev_data'] = ev.ev_data
+ payload['pgq.ev_type'] = ev.ev_type
+ payload['pgq.ev_extra1'] = ev.ev_extra1
+ payload['pgq.ev_time'] = ev.ev_time
+
+ self.log.debug(self.dst_query % payload)
+ curs.execute(self.dst_query, payload)
+ res = curs.fetchall()
+ self.log.debug(res)
+
+if __name__ == '__main__':
+ script = SimpleConsumer(sys.argv[1:])
+ script.start()
+
diff --git a/setup_skytools.py b/setup_skytools.py
index 648b9aac..0e4a65c8 100755
--- a/setup_skytools.py
+++ b/setup_skytools.py
@@ -37,6 +37,7 @@ sfx_scripts = [
'scripts/scriptmgr.py',
'scripts/queue_splitter.py',
'scripts/queue_mover.py',
+ 'scripts/simple_consumer.py',
'scripts/skytools_upgrade.py',
]
# those do not need suffix (no conflict with 2.1)