diff options
Diffstat (limited to 'python/londiste/playback.py')
| -rw-r--r-- | python/londiste/playback.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 2bcb1bc7..bba5604a 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -57,6 +57,7 @@ class TableState(object): self.log = log self.forget() self.changed = 0 + self.skip_truncate = False def forget(self): self.state = TABLE_MISSING @@ -65,6 +66,7 @@ class TableState(object): self.sync_tick_id = None self.ok_batch_count = 0 self.last_tick = 0 + self.skip_truncate = False self.changed = 1 def change_snapshot(self, str_snapshot, tag_changed = 1): @@ -135,12 +137,13 @@ class TableState(object): return state - def loaded_state(self, merge_state, str_snapshot): + def loaded_state(self, merge_state, str_snapshot, skip_truncate): self.log.debug("loaded_state: %s: %s / %s" % ( self.name, merge_state, str_snapshot)) self.change_snapshot(str_snapshot, 0) self.state = self.parse_state(merge_state) self.changed = 0 + self.skip_truncate = skip_truncate if merge_state == "?": self.changed = 1 @@ -408,6 +411,7 @@ class Replicator(pgq.SerialConsumer): self.fill_mirror_queue(mirror_list, dst_curs) def handle_data_event(self, ev, dst_curs): + # buffer SQL statements, then send them together fmt = self.sql_command[ev.type] sql = fmt % (ev.extra1, ev.data) self.sql_list.append(sql) @@ -416,6 +420,8 @@ class Replicator(pgq.SerialConsumer): ev.tag_done() def flush_sql(self, dst_curs): + # send all buffered statements at once + if len(self.sql_list) == 0: return @@ -472,9 +478,8 @@ class Replicator(pgq.SerialConsumer): to load state on every batch. """ - q = """select table_name, snapshot, merge_state - from londiste.subscriber_get_table_list(%s) - """ + q = "select table_name, snapshot, merge_state, skip_truncate"\ + " from londiste.subscriber_get_table_list(%s)" curs.execute(q, [self.pgq_queue_name]) new_list = [] @@ -483,7 +488,7 @@ class Replicator(pgq.SerialConsumer): t = self.get_table_by_name(row['table_name']) if not t: t = TableState(row['table_name'], self.log) - t.loaded_state(row['merge_state'], row['snapshot']) + t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate']) new_list.append(t) new_map[t.name] = t |
