diff options
author | Marko Kreen | 2012-07-17 12:35:58 +0000 |
---|---|---|
committer | Marko Kreen | 2012-07-18 09:35:56 +0000 |
commit | 3f427bd2e767f1ee760f5309aa08a149e11efbf4 (patch) | |
tree | 47ce1a4bee95d5d95b3edc97257a5908ca64e94e /scripts/simple_consumer.py | |
parent | c686e22f6bf0b2d032cd25db7bbfeeafc359bd7c (diff) |
simple_consumer: sync with simple_local_consumer changes
Diffstat (limited to 'scripts/simple_consumer.py')
-rwxr-xr-x | scripts/simple_consumer.py | 36 |
1 files changed, 26 insertions, 10 deletions
diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py index 109f27a5..df0db11c 100755 --- a/scripts/simple_consumer.py +++ b/scripts/simple_consumer.py @@ -3,9 +3,16 @@ """Consumer that simply calls SQL query for each event. Config:: + # source database + src_db = + + # destination database + dst_db = + # query to call dst_query = select * from somefunc(%%(pgq.ev_data)s); + ## Deprecated, use table_filter ## # filter for events (SQL fragment) consumer_filter = ev_extra1 = 'public.mytable1' """ @@ -22,10 +29,11 @@ import skytools class SimpleConsumer(pgq.Consumer): __doc__ = __doc__ - def __init__(self, args): - pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args) + def reload(self): + super(SimpleConsumer, self).reload() self.dst_query = self.cf.get("dst_query") - self.consumer_filter = self.cf.get("consumer_filter", "") + if self.cf.get("consumer_filter", ""): + self.consumer_filter = self.cf.get("consumer_filter", "") def process_event(self, db, ev): curs = self.get_database('dst_db', autocommit = 1).cursor() @@ -37,17 +45,25 @@ class SimpleConsumer(pgq.Consumer): payload = {} else: payload = skytools.db_urldecode(ev.ev_data) - payload['pgq.ev_data'] = ev.ev_data + payload['pgq.tick_id'] = self.batch_info['cur_tick_id'] + payload['pgq.ev_id'] = ev.ev_id + payload['pgq.ev_time'] = ev.ev_time payload['pgq.ev_type'] = ev.ev_type + payload['pgq.ev_data'] = ev.ev_data payload['pgq.ev_extra1'] = ev.ev_extra1 - payload['pgq.ev_time'] = ev.ev_time - - self.log.debug(self.dst_query % payload) + payload['pgq.ev_extra2'] = ev.ev_extra2 + payload['pgq.ev_extra3'] = ev.ev_extra3 + payload['pgq.ev_extra4'] = ev.ev_extra4 + + self.log.debug(self.dst_query, payload) curs.execute(self.dst_query, payload) - res = curs.fetchall() - self.log.debug(res) + if curs.statusmessage[:6] == 'SELECT': + res = curs.fetchall() + self.log.debug(res) + else: + self.log.debug(curs.statusmessage) if __name__ == '__main__': - script = SimpleConsumer(sys.argv[1:]) + script = SimpleConsumer("simple_consumer3", "src_db", sys.argv[1:]) script.start() |