summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/londiste/handlers/dispatch.py19
-rw-r--r--python/londiste/playback.py2
-rw-r--r--python/londiste/setup.py2
-rw-r--r--python/pgq/cascade/admin.py14
-rw-r--r--python/skytools/scripting.py4
-rwxr-xr-xpython/walmgr.py2
6 files changed, 33 insertions, 10 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 90d01bbb..68e17083 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -849,8 +849,13 @@ class Dispatcher (ShardHandler):
else if part function present in db, call it
else clone master table"""
curs = self.dst_curs
+ if (self.conf.ignore_old_events and self.conf.retention_period and
+ self.is_obsolete_partition (dst, self.conf.retention_period, self.conf.period)):
+ self.ignored_tables.add(dst)
+ return
if skytools.exists_table(curs, dst):
return
+
dst = quote_fqident(dst)
vals = {'dest': dst,
'part': dst,
@@ -926,6 +931,20 @@ class Dispatcher (ShardHandler):
self.log.info("Dropped tables: %s", ", ".join(res))
return res
+ def is_obsolete_partition (self, partition_table, retention_period, partition_period):
+ """ Test partition name of partition-by-date parent table.
+ """
+ curs = self.dst_curs
+ func = "londiste.is_obsolete_partition"
+ args = [partition_table, retention_period, partition_period]
+ sql = "select " + func + " (%s, %s, %s)"
+ self.log.debug("func: %s, args: %s", func, args)
+ curs.execute(sql, args)
+ res = curs.fetchone()[0]
+ if res:
+ self.log.info("Ignored table: %s", partition_table)
+ return res
+
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index f21d7e58..c15a3f5b 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -726,7 +726,7 @@ class Replicator(CascadedWorker):
res = self.exec_cmd(dst_curs, q, [self.queue_name, fname, sql, s_attrs], commit = False)
ret = res[0]['ret_code']
if ret > 200:
- self.log.info("Skipping execution of '%s'", fname)
+ self.log.warning("Skipping execution of '%s'", fname)
if pgver >= 80300:
dst_curs.execute("set local session_replication_role = 'replica'")
return
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 055be922..d374d92e 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -572,7 +572,7 @@ class LondisteSetup(CascadeAdmin):
q = "select * from londiste.execute_start(%s, %s, %s, true, %s)"
res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False)
ret = res[0]['ret_code']
- if ret >= 300:
+ if ret > 200:
self.log.warning("Skipping execution of '%s'", fname)
continue
if attrs.need_execute(curs, local_tables, local_seqs):
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py
index 39ffd96c..a9d599ea 100644
--- a/python/pgq/cascade/admin.py
+++ b/python/pgq/cascade/admin.py
@@ -656,9 +656,12 @@ class CascadeAdmin(skytools.AdminScript):
try:
# unregister node location from root node (event will be added to queue)
- root_db = self.find_root_db()
- q = "select * from pgq_node.unregister_location(%s, %s)"
- self.exec_cmd(root_db, q, [self.queue_name, node_name])
+ if node and node.type == 'root':
+ pass
+ else:
+ root_db = self.find_root_db()
+ q = "select * from pgq_node.unregister_location(%s, %s)"
+ self.exec_cmd(root_db, q, [self.queue_name, node_name])
except skytools.DBError, d:
self.log.warning("Unregister from root failed: %s", str(d))
@@ -841,7 +844,8 @@ class CascadeAdmin(skytools.AdminScript):
# switch subscribers around
if self.options.all or failover:
for n in self.find_subscribers_for(old_node_name):
- self.node_change_provider(n, new_node_name)
+ if n != new_node_name:
+ self.node_change_provider(n, new_node_name)
def find_provider(self, node_name):
if self.node_alive(node_name):
@@ -858,7 +862,7 @@ class CascadeAdmin(skytools.AdminScript):
return self.find_root_node()
def find_subscribers_for(self, parent_node_name):
- """Find subscribers for particular node node."""
+ """Find subscribers for particular node."""
# use dict to eliminate duplicates
res = {}
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index 77c1bec5..24dd684b 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -671,7 +671,7 @@ class BaseScript(object):
In case of daemon, if will be called in same process as work(),
unlike __init__().
"""
- pass
+ self.log.info("Script finished, exiting")
# define some aliases (short-cuts / backward compatibility cruft)
stat_add = stat_put # Old, deprecated function.
@@ -957,7 +957,7 @@ class DBScript(BaseScript):
sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5)
sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60)
- elist = exceptions or tuple([])
+ elist = exceptions or tuple()
stime = time.time()
tried = 0
dbc = None
diff --git a/python/walmgr.py b/python/walmgr.py
index 2918b067..8de02bd4 100755
--- a/python/walmgr.py
+++ b/python/walmgr.py
@@ -1821,7 +1821,7 @@ STOP TIME: %(stop_time)s
self.log.info("%s: not found (ignored)", srcname)
# remove PG_RECEIVEXLOG file if it's present
- if os.path.isfile(prxlogfile):
+ if os.path.isfile(prxlogfile) and not srcname.endswith('.history'):
os.remove(prxlogfile)
sys.exit(1)