# query to call
dst_query = select * from somefunc(%%(pgq.ev_data)s);
+
+ ## Use table_filter where possible instead of this ##
+ # filter for events (SQL fragment)
+ consumer_filter = ev_extra1 = 'public.mytable1'
"""
def reload(self):
super(SimpleLocalConsumer, self).reload()
self.dst_query = self.cf.get("dst_query")
+ if self.cf.get("consumer_filter", ""):
+ self.consumer_filter = self.cf.get("consumer_filter", "")
def process_local_event(self, db, batch_id, ev):
curs = self.get_database('dst_db', autocommit = 1).cursor()