diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/playback.py | 6 | ||||
-rw-r--r-- | python/pgq/consumer.py | 9 |
2 files changed, 13 insertions, 2 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 30dcc261..f9ce107e 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -285,6 +285,9 @@ class Replicator(CascadedWorker): if self.parallel_copies < 1: raise Exception('Bad value for parallel_copies: %d' % self.parallel_copies) + self.local_only = self.cf.getboolean('local_only', False) + self.consumer_filter = None + load_handlers(self.cf) def connection_hook(self, dbname, db): @@ -326,7 +329,6 @@ class Replicator(CascadedWorker): if not self.copy_thread: self.restore_fkeys(dst_db) - for p in self.used_plugins.values(): p.reset() self.used_plugins = {} @@ -346,6 +348,8 @@ class Replicator(CascadedWorker): # finalize table changes self.save_table_state(dst_curs) + if self.local_only: + self.consumer_filter = self.table_map.keys() def sync_tables(self, src_db, dst_db): """Table sync loop. diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index b5662807..7426a672 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -143,6 +143,8 @@ class Consumer(skytools.DBScript): batch_info = None + consumer_filter = None + def __init__(self, service_name, db_name, args): """Initialize new consumer. @@ -272,7 +274,12 @@ class Consumer(skytools.DBScript): # load events sql = "select * from pgq.get_batch_events(%d)" % batch_id - curs.execute(sql) + if self.consumer_filter is not None: + sql += """ +where ((ev_type like 'pgq%%' or ev_type like 'londiste%%') +or (ev_extra1 = ANY(%s))) +""" + curs.execute(sql, [self.consumer_filter]) rows = curs.dictfetchall() # map them to python objects |