summaryrefslogtreecommitdiff
path: root/scripts/simple_consumer.py
diff options
context:
space:
mode:
authorMarko Kreen2012-07-17 12:35:58 +0000
committerMarko Kreen2012-07-18 09:35:56 +0000
commit3f427bd2e767f1ee760f5309aa08a149e11efbf4 (patch)
tree47ce1a4bee95d5d95b3edc97257a5908ca64e94e /scripts/simple_consumer.py
parentc686e22f6bf0b2d032cd25db7bbfeeafc359bd7c (diff)
simple_consumer: sync with simple_local_consumer changes
Diffstat (limited to 'scripts/simple_consumer.py')
-rwxr-xr-xscripts/simple_consumer.py36
1 files changed, 26 insertions, 10 deletions
diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py
index 109f27a5..df0db11c 100755
--- a/scripts/simple_consumer.py
+++ b/scripts/simple_consumer.py
@@ -3,9 +3,16 @@
"""Consumer that simply calls SQL query for each event.
Config::
+ # source database
+ src_db =
+
+ # destination database
+ dst_db =
+
# query to call
dst_query = select * from somefunc(%%(pgq.ev_data)s);
+ ## Deprecated, use table_filter ##
# filter for events (SQL fragment)
consumer_filter = ev_extra1 = 'public.mytable1'
"""
@@ -22,10 +29,11 @@ import skytools
class SimpleConsumer(pgq.Consumer):
__doc__ = __doc__
- def __init__(self, args):
- pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args)
+ def reload(self):
+ super(SimpleConsumer, self).reload()
self.dst_query = self.cf.get("dst_query")
- self.consumer_filter = self.cf.get("consumer_filter", "")
+ if self.cf.get("consumer_filter", ""):
+ self.consumer_filter = self.cf.get("consumer_filter", "")
def process_event(self, db, ev):
curs = self.get_database('dst_db', autocommit = 1).cursor()
@@ -37,17 +45,25 @@ class SimpleConsumer(pgq.Consumer):
payload = {}
else:
payload = skytools.db_urldecode(ev.ev_data)
- payload['pgq.ev_data'] = ev.ev_data
+ payload['pgq.tick_id'] = self.batch_info['cur_tick_id']
+ payload['pgq.ev_id'] = ev.ev_id
+ payload['pgq.ev_time'] = ev.ev_time
payload['pgq.ev_type'] = ev.ev_type
+ payload['pgq.ev_data'] = ev.ev_data
payload['pgq.ev_extra1'] = ev.ev_extra1
- payload['pgq.ev_time'] = ev.ev_time
-
- self.log.debug(self.dst_query % payload)
+ payload['pgq.ev_extra2'] = ev.ev_extra2
+ payload['pgq.ev_extra3'] = ev.ev_extra3
+ payload['pgq.ev_extra4'] = ev.ev_extra4
+
+ self.log.debug(self.dst_query, payload)
curs.execute(self.dst_query, payload)
- res = curs.fetchall()
- self.log.debug(res)
+ if curs.statusmessage[:6] == 'SELECT':
+ res = curs.fetchall()
+ self.log.debug(res)
+ else:
+ self.log.debug(curs.statusmessage)
if __name__ == '__main__':
- script = SimpleConsumer(sys.argv[1:])
+ script = SimpleConsumer("simple_consumer3", "src_db", sys.argv[1:])
script.start()