summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/londiste/playback.py32
1 files changed, 28 insertions, 4 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 117a6640..d3fe584b 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -378,6 +378,7 @@ class Replicator(CascadedWorker):
self.load_table_state(dst_db.cursor())
dst_db.commit()
+ dsync_backup = None
def sync_from_main_thread(self, cnt, src_db, dst_db):
"Main thread sync logic."
@@ -388,13 +389,36 @@ class Replicator(CascadedWorker):
if cnt.do_sync:
# wait for copy thread to catch up
ret = SYNC_LOOP
-
+
+ # we need to do wanna-sync->do_sync with small batches
+ need_dsync = False
+ dsync_ok = True
+ if self.pgq_min_interval or self.pgq_min_count:
+ dsync_ok = False
+ elif self.dsync_backup and self.dsync_backup[0] >= self.cur_tick:
+ dsync_ok = False
+
+ # now check if do-sync is needed
for t in self.get_tables_in_state(TABLE_WANNA_SYNC):
# copy thread wants sync, if not behind, do it
if self.cur_tick >= t.sync_tick_id:
- self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick)
- ret = SYNC_LOOP
-
+ if dsync_ok:
+ self.change_table_state(dst_db, t, TABLE_DO_SYNC, self.cur_tick)
+ ret = SYNC_LOOP
+ else:
+ need_dsync = True
+
+ # tune batch size if needed
+ if need_dsync:
+ if self.pgq_min_count or self.pgq_min_interval:
+ bak = (self.cur_tick, self.pgq_min_count, self.pgq_min_interval)
+ self.dsync_backup = bak
+ elif self.dsync_backup:
+ self.pgq_min_count = self.dsync_backup[1]
+ self.pgq_min_interval = self.dsync_backup[2]
+ self.dsync_backup = None
+
+ # now handle new copies
npossible = self.parallel_copies - cnt.get_copy_count()
if cnt.missing and npossible > 0:
pmap = self.get_state_map(src_db.cursor())