summaryrefslogtreecommitdiff
path: root/python/londiste/playback.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/londiste/playback.py')
-rw-r--r--python/londiste/playback.py15
1 files changed, 10 insertions, 5 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 2bcb1bc7..bba5604a 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -57,6 +57,7 @@ class TableState(object):
self.log = log
self.forget()
self.changed = 0
+ self.skip_truncate = False
def forget(self):
self.state = TABLE_MISSING
@@ -65,6 +66,7 @@ class TableState(object):
self.sync_tick_id = None
self.ok_batch_count = 0
self.last_tick = 0
+ self.skip_truncate = False
self.changed = 1
def change_snapshot(self, str_snapshot, tag_changed = 1):
@@ -135,12 +137,13 @@ class TableState(object):
return state
- def loaded_state(self, merge_state, str_snapshot):
+ def loaded_state(self, merge_state, str_snapshot, skip_truncate):
self.log.debug("loaded_state: %s: %s / %s" % (
self.name, merge_state, str_snapshot))
self.change_snapshot(str_snapshot, 0)
self.state = self.parse_state(merge_state)
self.changed = 0
+ self.skip_truncate = skip_truncate
if merge_state == "?":
self.changed = 1
@@ -408,6 +411,7 @@ class Replicator(pgq.SerialConsumer):
self.fill_mirror_queue(mirror_list, dst_curs)
def handle_data_event(self, ev, dst_curs):
+ # buffer SQL statements, then send them together
fmt = self.sql_command[ev.type]
sql = fmt % (ev.extra1, ev.data)
self.sql_list.append(sql)
@@ -416,6 +420,8 @@ class Replicator(pgq.SerialConsumer):
ev.tag_done()
def flush_sql(self, dst_curs):
+ # send all buffered statements at once
+
if len(self.sql_list) == 0:
return
@@ -472,9 +478,8 @@ class Replicator(pgq.SerialConsumer):
to load state on every batch.
"""
- q = """select table_name, snapshot, merge_state
- from londiste.subscriber_get_table_list(%s)
- """
+ q = "select table_name, snapshot, merge_state, skip_truncate"\
+ " from londiste.subscriber_get_table_list(%s)"
curs.execute(q, [self.pgq_queue_name])
new_list = []
@@ -483,7 +488,7 @@ class Replicator(pgq.SerialConsumer):
t = self.get_table_by_name(row['table_name'])
if not t:
t = TableState(row['table_name'], self.log)
- t.loaded_state(row['merge_state'], row['snapshot'])
+ t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate'])
new_list.append(t)
new_map[t.name] = t