diff options
Diffstat (limited to 'python/londiste')
| -rw-r--r-- | python/londiste/handlers/dispatch.py | 19 | ||||
| -rw-r--r-- | python/londiste/playback.py | 2 | ||||
| -rw-r--r-- | python/londiste/setup.py | 2 |
3 files changed, 21 insertions, 2 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 90d01bbb..68e17083 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -849,8 +849,13 @@ class Dispatcher (ShardHandler): else if part function present in db, call it else clone master table""" curs = self.dst_curs + if (self.conf.ignore_old_events and self.conf.retention_period and + self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)): + self.ignored_tables.add(dst) + return if skytools.exists_table(curs, dst): return + dst = quote_fqident(dst) vals = {'dest': dst, 'part': dst, @@ -926,6 +931,20 @@ class Dispatcher (ShardHandler): self.log.info("Dropped tables: %s", ", ".join(res)) return res + def is_obsolete_partition (self, partition_table, retention_period, partition_period): + """ Test partition name of partition-by-date parent table. + """ + curs = self.dst_curs + func = "londiste.is_obsolete_partition" + args = [partition_table, retention_period, partition_period] + sql = "select " + func + " (%s, %s, %s)" + self.log.debug("func: %s, args: %s", func, args) + curs.execute(sql, args) + res = curs.fetchone()[0] + if res: + self.log.info("Ignored table: %s", partition_table) + return res + def get_copy_condition(self, src_curs, dst_curs): """ Prepare where condition for copy and replay filtering. """ diff --git a/python/londiste/playback.py b/python/londiste/playback.py index f21d7e58..c15a3f5b 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -726,7 +726,7 @@ class Replicator(CascadedWorker): res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql, s_attrs], commit = False) ret = res[0]['ret_code'] if ret > 200: - self.log.info("Skipping execution of '%s'", fname) + self.log.warning("Skipping execution of '%s'", fname) if pgver >= 80300: dst_curs.execute("set local session_replication_role = 'replica'") return diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 055be922..d374d92e 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -572,7 +572,7 @@ class LondisteSetup(CascadeAdmin): q = "select * from londiste.execute_start(%s, %s, %s, true, %s)" res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False) ret = res[0]['ret_code'] - if ret >= 300: + if ret > 200: self.log.warning("Skipping execution of '%s'", fname) continue if attrs.need_execute(curs, local_tables, local_seqs): |
