summaryrefslogtreecommitdiff
path: root/python/skytools/scripting.py
diff options
context:
space:
mode:
authormartinko2014-04-02 14:55:14 +0000
committermartinko2014-04-02 14:55:14 +0000
commitfc1e1f4b7a9b50418c41045c92fbe31947b83796 (patch)
tree2d2f66bde621f7ad0dd012911ad19fbdcfa7ffcc /python/skytools/scripting.py
parent3cfc1ad65876845641fbf2868994e6c6b53dd9ff (diff)
parent62abe7b14dedaba7a6dd5678c8e0b4e3335ed81c (diff)
Merge branch 'release/3.2'skytools_3_2
Diffstat (limited to 'python/skytools/scripting.py')
-rw-r--r--python/skytools/scripting.py90
1 files changed, 79 insertions, 11 deletions
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index 840f3cf4..68ddc4d3 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -3,9 +3,18 @@
"""
-import sys, os, signal, optparse, time, errno, select
-import logging, logging.handlers, logging.config
-
+import errno
+import logging
+import logging.config
+import logging.handlers
+import optparse
+import os
+import select
+import signal
+import sys
+import time
+
+import psycopg2
import skytools
import skytools.skylog
@@ -468,6 +477,9 @@ class BaseScript(object):
self.pidfile = self.cf.getfile("pidfile", '')
self.loop_delay = self.cf.getfloat("loop_delay", self.loop_delay)
self.exception_sleep = self.cf.getfloat("exception_sleep", 20)
+ self.exception_quiet = self.cf.getlist("exception_quiet", [])
+ self.exception_grace = self.cf.getfloat("exception_grace", 5*60)
+ self.exception_reset = self.cf.getfloat("exception_reset", 15*60)
def hook_sighup(self, sig, frame):
"Internal SIGHUP handler. Minimal code here."
@@ -560,12 +572,14 @@ class BaseScript(object):
return state
+ last_func_fail = None
def run_func_safely(self, func, prefer_looping = False):
"Run users work function, safely."
- cname = None
- emsg = None
try:
- return func()
+ r = func()
+ if self.last_func_fail and time.time() > self.last_func_fail + self.exception_reset:
+ self.last_func_fail = None
+ return r
except UsageError, d:
self.log.error(str(d))
sys.exit(1)
@@ -592,6 +606,8 @@ class BaseScript(object):
self.send_stats()
except:
pass
+ if self.last_func_fail is None:
+ self.last_func_fail = time.time()
emsg = str(d).rstrip()
self.reset()
self.exception_hook(d, emsg)
@@ -610,6 +626,10 @@ class BaseScript(object):
if ex.errno != errno.EINTR:
raise
+ def _is_quiet_exception(self, ex):
+ return ((self.exception_quiet == ["ALL"] or ex.__class__.__name__ in self.exception_quiet)
+ and self.last_func_fail and time.time() < self.last_func_fail + self.exception_grace)
+
def exception_hook(self, det, emsg):
"""Called on after exception processing.
@@ -618,8 +638,11 @@ class BaseScript(object):
@param det: exception details
@param emsg: exception msg
"""
- self.log.exception("Job %s crashed: %s" % (
- self.job_name, emsg))
+ lm = "Job %s crashed: %s" % (self.job_name, emsg)
+ if self._is_quiet_exception(det):
+ self.log.warning(lm)
+ else:
+ self.log.exception(lm)
def work(self):
"""Here should user's processing happen.
@@ -801,8 +824,12 @@ class DBScript(BaseScript):
sql = getattr(curs, 'query', None) or '?'
if len(sql) > 200: # avoid logging londiste huge batched queries
sql = sql[:60] + " ..."
- self.log.exception("Job %s got error on connection '%s': %s. Query: %s" % (
- self.job_name, cname, emsg, sql))
+ lm = "Job %s got error on connection '%s': %s. Query: %s" % (
+ self.job_name, cname, emsg, sql)
+ if self._is_quiet_exception(d):
+ self.log.warning(lm)
+ else:
+ self.log.exception(lm)
else:
BaseScript.exception_hook(self, d, emsg)
@@ -921,6 +948,48 @@ class DBScript(BaseScript):
# error is already logged
sys.exit(1)
+ def execute_with_retry (self, dbname, stmt, args, exceptions = None):
+ """ Execute SQL and retry if it fails.
+ Return number of retries and current valid cursor, or raise an exception.
+ """
+ sql_retry = self.cf.getbool("sql_retry", False)
+ sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10)
+ sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300)
+ sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
+ sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5)
+ sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60)
+ elist = exceptions or (psycopg2.OperationalError,)
+ stime = time.time()
+ tried = 0
+ dbc = None
+ while True:
+ try:
+ if dbc is None:
+ if dbname not in self.db_cache:
+ self.get_database(dbname, autocommit=1)
+ dbc = self.db_cache[dbname]
+ if dbc.isolation_level != skytools.I_AUTOCOMMIT:
+ raise skytools.UsageError ("execute_with_retry: autocommit required")
+ else:
+ dbc.reset()
+ curs = dbc.get_connection(dbc.isolation_level).cursor()
+ curs.execute (stmt, args)
+ break
+ except elist, e:
+ if not sql_retry or tried >= sql_retry_max_count or time.time() - stime >= sql_retry_max_time:
+ raise
+ self.log.info("Job %s got error on connection %s: %s" % (self.job_name, dbname, e))
+ except:
+ raise
+ # y = a + bx , apply cap
+ y = sql_retry_formula_a + sql_retry_formula_b * tried
+ if sql_retry_formula_cap is not None and y > sql_retry_formula_cap:
+ y = sql_retry_formula_cap
+ tried += 1
+ self.log.info("Retry #%i in %i seconds ...", tried, y)
+ self.sleep(y)
+ return tried, curs
+
def listen(self, dbname, channel):
"""Make connection listen for specific event channel.
@@ -960,7 +1029,6 @@ class DBCachedConn(object):
self.conn = None
self.conn_time = 0
self.max_age = max_age
- self.autocommit = -1
self.isolation_level = -1
self.verbose = verbose
self.setup_func = setup_func