summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/londiste/playback.py6
-rw-r--r--python/pgq/consumer.py9
2 files changed, 13 insertions, 2 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 30dcc261..f9ce107e 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -285,6 +285,9 @@ class Replicator(CascadedWorker):
if self.parallel_copies < 1:
raise Exception('Bad value for parallel_copies: %d' % self.parallel_copies)
+ self.local_only = self.cf.getboolean('local_only', False)
+ self.consumer_filter = None
+
load_handlers(self.cf)
def connection_hook(self, dbname, db):
@@ -326,7 +329,6 @@ class Replicator(CascadedWorker):
if not self.copy_thread:
self.restore_fkeys(dst_db)
-
for p in self.used_plugins.values():
p.reset()
self.used_plugins = {}
@@ -346,6 +348,8 @@ class Replicator(CascadedWorker):
# finalize table changes
self.save_table_state(dst_curs)
+ if self.local_only:
+ self.consumer_filter = self.table_map.keys()
def sync_tables(self, src_db, dst_db):
"""Table sync loop.
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index b5662807..7426a672 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -143,6 +143,8 @@ class Consumer(skytools.DBScript):
batch_info = None
+ consumer_filter = None
+
def __init__(self, service_name, db_name, args):
"""Initialize new consumer.
@@ -272,7 +274,12 @@ class Consumer(skytools.DBScript):
# load events
sql = "select * from pgq.get_batch_events(%d)" % batch_id
- curs.execute(sql)
+ if self.consumer_filter is not None:
+ sql += """
+where ((ev_type like 'pgq%%' or ev_type like 'londiste%%')
+or (ev_extra1 = ANY(%s)))
+"""
+ curs.execute(sql, [self.consumer_filter])
rows = curs.dictfetchall()
# map them to python objects