diff options
-rw-r--r-- | python/londiste/playback.py | 32 |
1 files changed, 28 insertions, 4 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 117a6640..d3fe584b 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -378,6 +378,7 @@ class Replicator(CascadedWorker): self.load_table_state(dst_db.cursor()) dst_db.commit() + dsync_backup = None def sync_from_main_thread(self, cnt, src_db, dst_db): "Main thread sync logic." @@ -388,13 +389,36 @@ class Replicator(CascadedWorker): if cnt.do_sync: # wait for copy thread to catch up ret = SYNC_LOOP - + + # we need to do wanna-sync->do_sync with small batches + need_dsync = False + dsync_ok = True + if self.pgq_min_interval or self.pgq_min_count: + dsync_ok = False + elif self.dsync_backup and self.dsync_backup[0] >= self.cur_tick: + dsync_ok = False + + # now check if do-sync is needed for t in self.get_tables_in_state(TABLE_WANNA_SYNC): # copy thread wants sync, if not behind, do it if self.cur_tick >= t.sync_tick_id: - self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick) - ret = SYNC_LOOP - + if dsync_ok: + self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick) + ret = SYNC_LOOP + else: + need_dsync = True + + # tune batch size if needed + if need_dsync: + if self.pgq_min_count or self.pgq_min_interval: + bak = (self.cur_tick, self.pgq_min_count, self.pgq_min_interval) + self.dsync_backup = bak + elif self.dsync_backup: + self.pgq_min_count = self.dsync_backup[1] + self.pgq_min_interval = self.dsync_backup[2] + self.dsync_backup = None + + # now handle new copies npossible = self.parallel_copies - cnt.get_copy_count() if cnt.missing and npossible > 0: pmap = self.get_state_map(src_db.cursor()) |