diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/handlers/dispatch.py | 19 | ||||
-rw-r--r-- | python/londiste/playback.py | 2 | ||||
-rw-r--r-- | python/londiste/setup.py | 2 | ||||
-rw-r--r-- | python/pgq/cascade/admin.py | 14 | ||||
-rw-r--r-- | python/skytools/scripting.py | 4 | ||||
-rwxr-xr-x | python/walmgr.py | 2 |
6 files changed, 33 insertions, 10 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 90d01bbb..68e17083 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -849,8 +849,13 @@ class Dispatcher (ShardHandler): else if part function present in db, call it else clone master table""" curs = self.dst_curs + if (self.conf.ignore_old_events and self.conf.retention_period and + self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)): + self.ignored_tables.add(dst) + return if skytools.exists_table(curs, dst): return + dst = quote_fqident(dst) vals = {'dest': dst, 'part': dst, @@ -926,6 +931,20 @@ class Dispatcher (ShardHandler): self.log.info("Dropped tables: %s", ", ".join(res)) return res + def is_obsolete_partition (self, partition_table, retention_period, partition_period): + """ Test partition name of partition-by-date parent table. + """ + curs = self.dst_curs + func = "londiste.is_obsolete_partition" + args = [partition_table, retention_period, partition_period] + sql = "select " + func + " (%s, %s, %s)" + self.log.debug("func: %s, args: %s", func, args) + curs.execute(sql, args) + res = curs.fetchone()[0] + if res: + self.log.info("Ignored table: %s", partition_table) + return res + def get_copy_condition(self, src_curs, dst_curs): """ Prepare where condition for copy and replay filtering. """ diff --git a/python/londiste/playback.py b/python/londiste/playback.py index f21d7e58..c15a3f5b 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -726,7 +726,7 @@ class Replicator(CascadedWorker): res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql, s_attrs], commit = False) ret = res[0]['ret_code'] if ret > 200: - self.log.info("Skipping execution of '%s'", fname) + self.log.warning("Skipping execution of '%s'", fname) if pgver >= 80300: dst_curs.execute("set local session_replication_role = 'replica'") return diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 055be922..d374d92e 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -572,7 +572,7 @@ class LondisteSetup(CascadeAdmin): q = "select * from londiste.execute_start(%s, %s, %s, true, %s)" res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False) ret = res[0]['ret_code'] - if ret >= 300: + if ret > 200: self.log.warning("Skipping execution of '%s'", fname) continue if attrs.need_execute(curs, local_tables, local_seqs): diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 39ffd96c..a9d599ea 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -656,9 +656,12 @@ class CascadeAdmin(skytools.AdminScript): try: # unregister node location from root node (event will be added to queue) - root_db = self.find_root_db() - q = "select * from pgq_node.unregister_location(%s, %s)" - self.exec_cmd(root_db, q, [self.queue_name, node_name]) + if node and node.type == 'root': + pass + else: + root_db = self.find_root_db() + q = "select * from pgq_node.unregister_location(%s, %s)" + self.exec_cmd(root_db, q, [self.queue_name, node_name]) except skytools.DBError, d: self.log.warning("Unregister from root failed: %s", str(d)) @@ -841,7 +844,8 @@ class CascadeAdmin(skytools.AdminScript): # switch subscribers around if self.options.all or failover: for n in self.find_subscribers_for(old_node_name): - self.node_change_provider(n, new_node_name) + if n != new_node_name: + self.node_change_provider(n, new_node_name) def find_provider(self, node_name): if self.node_alive(node_name): @@ -858,7 +862,7 @@ class CascadeAdmin(skytools.AdminScript): return self.find_root_node() def find_subscribers_for(self, parent_node_name): - """Find subscribers for particular node node.""" + """Find subscribers for particular node.""" # use dict to eliminate duplicates res = {} diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 77c1bec5..24dd684b 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -671,7 +671,7 @@ class BaseScript(object): In case of daemon, if will be called in same process as work(), unlike __init__(). """ - pass + self.log.info("Script finished, exiting") # define some aliases (short-cuts / backward compatibility cruft) stat_add = stat_put # Old, deprecated function. @@ -957,7 +957,7 @@ class DBScript(BaseScript): sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) - elist = exceptions or tuple([]) + elist = exceptions or tuple() stime = time.time() tried = 0 dbc = None diff --git a/python/walmgr.py b/python/walmgr.py index 2918b067..8de02bd4 100755 --- a/python/walmgr.py +++ b/python/walmgr.py @@ -1821,7 +1821,7 @@ STOP TIME: %(stop_time)s self.log.info("%s: not found (ignored)", srcname) # remove PG_RECEIVEXLOG file if it's present - if os.path.isfile(prxlogfile): + if os.path.isfile(prxlogfile) and not srcname.endswith('.history'): os.remove(prxlogfile) sys.exit(1) |