summaryrefslogtreecommitdiff
path: root/python/londiste
diff options
context:
space:
mode:
Diffstat (limited to 'python/londiste')
-rw-r--r--python/londiste/handlers/dispatch.py19
-rw-r--r--python/londiste/playback.py2
-rw-r--r--python/londiste/setup.py2
3 files changed, 21 insertions, 2 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 90d01bbb..68e17083 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -849,8 +849,13 @@ class Dispatcher (ShardHandler):
else if part function present in db, call it
else clone master table"""
curs = self.dst_curs
+ if (self.conf.ignore_old_events and self.conf.retention_period and
+ self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)):
+ self.ignored_tables.add(dst)
+ return
if skytools.exists_table(curs, dst):
return
+
dst = quote_fqident(dst)
vals = {'dest': dst,
'part': dst,
@@ -926,6 +931,20 @@ class Dispatcher (ShardHandler):
self.log.info("Dropped tables: %s", ", ".join(res))
return res
+ def is_obsolete_partition (self, partition_table, retention_period, partition_period):
+ """ Test partition name of partition-by-date parent table.
+ """
+ curs = self.dst_curs
+ func = "londiste.is_obsolete_partition"
+ args = [partition_table, retention_period, partition_period]
+ sql = "select " + func + " (%s, %s, %s)"
+ self.log.debug("func: %s, args: %s", func, args)
+ curs.execute(sql, args)
+ res = curs.fetchone()[0]
+ if res:
+ self.log.info("Ignored table: %s", partition_table)
+ return res
+
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index f21d7e58..c15a3f5b 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -726,7 +726,7 @@ class Replicator(CascadedWorker):
res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql, s_attrs], commit = False)
ret = res[0]['ret_code']
if ret > 200:
- self.log.info("Skipping execution of '%s'", fname)
+ self.log.warning("Skipping execution of '%s'", fname)
if pgver >= 80300:
dst_curs.execute("set local session_replication_role = 'replica'")
return
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 055be922..d374d92e 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -572,7 +572,7 @@ class LondisteSetup(CascadeAdmin):
q = "select * from londiste.execute_start(%s, %s, %s, true, %s)"
res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False)
ret = res[0]['ret_code']
- if ret >= 300:
+ if ret > 200:
self.log.warning("Skipping execution of '%s'", fname)
continue
if attrs.need_execute(curs, local_tables, local_seqs):