diff options
author | martinko | 2014-04-02 14:55:14 +0000 |
---|---|---|
committer | martinko | 2014-04-02 14:55:14 +0000 |
commit | fc1e1f4b7a9b50418c41045c92fbe31947b83796 (patch) | |
tree | 2d2f66bde621f7ad0dd012911ad19fbdcfa7ffcc /python/skytools/scripting.py | |
parent | 3cfc1ad65876845641fbf2868994e6c6b53dd9ff (diff) | |
parent | 62abe7b14dedaba7a6dd5678c8e0b4e3335ed81c (diff) |
Merge branch 'release/3.2'skytools_3_2
Diffstat (limited to 'python/skytools/scripting.py')
-rw-r--r-- | python/skytools/scripting.py | 90 |
1 files changed, 79 insertions, 11 deletions
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 840f3cf4..68ddc4d3 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -3,9 +3,18 @@ """ -import sys, os, signal, optparse, time, errno, select -import logging, logging.handlers, logging.config - +import errno +import logging +import logging.config +import logging.handlers +import optparse +import os +import select +import signal +import sys +import time + +import psycopg2 import skytools import skytools.skylog @@ -468,6 +477,9 @@ class BaseScript(object): self.pidfile = self.cf.getfile("pidfile", '') self.loop_delay = self.cf.getfloat("loop_delay", self.loop_delay) self.exception_sleep = self.cf.getfloat("exception_sleep", 20) + self.exception_quiet = self.cf.getlist("exception_quiet", []) + self.exception_grace = self.cf.getfloat("exception_grace", 5*60) + self.exception_reset = self.cf.getfloat("exception_reset", 15*60) def hook_sighup(self, sig, frame): "Internal SIGHUP handler. Minimal code here." @@ -560,12 +572,14 @@ class BaseScript(object): return state + last_func_fail = None def run_func_safely(self, func, prefer_looping = False): "Run users work function, safely." - cname = None - emsg = None try: - return func() + r = func() + if self.last_func_fail and time.time() > self.last_func_fail + self.exception_reset: + self.last_func_fail = None + return r except UsageError, d: self.log.error(str(d)) sys.exit(1) @@ -592,6 +606,8 @@ class BaseScript(object): self.send_stats() except: pass + if self.last_func_fail is None: + self.last_func_fail = time.time() emsg = str(d).rstrip() self.reset() self.exception_hook(d, emsg) @@ -610,6 +626,10 @@ class BaseScript(object): if ex.errno != errno.EINTR: raise + def _is_quiet_exception(self, ex): + return ((self.exception_quiet == ["ALL"] or ex.__class__.__name__ in self.exception_quiet) + and self.last_func_fail and time.time() < self.last_func_fail + self.exception_grace) + def exception_hook(self, det, emsg): """Called on after exception processing. @@ -618,8 +638,11 @@ class BaseScript(object): @param det: exception details @param emsg: exception msg """ - self.log.exception("Job %s crashed: %s" % ( - self.job_name, emsg)) + lm = "Job %s crashed: %s" % (self.job_name, emsg) + if self._is_quiet_exception(det): + self.log.warning(lm) + else: + self.log.exception(lm) def work(self): """Here should user's processing happen. @@ -801,8 +824,12 @@ class DBScript(BaseScript): sql = getattr(curs, 'query', None) or '?' if len(sql) > 200: # avoid logging londiste huge batched queries sql = sql[:60] + " ..." - self.log.exception("Job %s got error on connection '%s': %s. Query: %s" % ( - self.job_name, cname, emsg, sql)) + lm = "Job %s got error on connection '%s': %s. Query: %s" % ( + self.job_name, cname, emsg, sql) + if self._is_quiet_exception(d): + self.log.warning(lm) + else: + self.log.exception(lm) else: BaseScript.exception_hook(self, d, emsg) @@ -921,6 +948,48 @@ class DBScript(BaseScript): # error is already logged sys.exit(1) + def execute_with_retry (self, dbname, stmt, args, exceptions = None): + """ Execute SQL and retry if it fails. + Return number of retries and current valid cursor, or raise an exception. + """ + sql_retry = self.cf.getbool("sql_retry", False) + sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) + sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) + sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) + sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) + sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) + elist = exceptions or (psycopg2.OperationalError,) + stime = time.time() + tried = 0 + dbc = None + while True: + try: + if dbc is None: + if dbname not in self.db_cache: + self.get_database(dbname, autocommit=1) + dbc = self.db_cache[dbname] + if dbc.isolation_level != skytools.I_AUTOCOMMIT: + raise skytools.UsageError ("execute_with_retry: autocommit required") + else: + dbc.reset() + curs = dbc.get_connection(dbc.isolation_level).cursor() + curs.execute (stmt, args) + break + except elist, e: + if not sql_retry or tried >= sql_retry_max_count or time.time() - stime >= sql_retry_max_time: + raise + self.log.info("Job %s got error on connection %s: %s" % (self.job_name, dbname, e)) + except: + raise + # y = a + bx , apply cap + y = sql_retry_formula_a + sql_retry_formula_b * tried + if sql_retry_formula_cap is not None and y > sql_retry_formula_cap: + y = sql_retry_formula_cap + tried += 1 + self.log.info("Retry #%i in %i seconds ...", tried, y) + self.sleep(y) + return tried, curs + def listen(self, dbname, channel): """Make connection listen for specific event channel. @@ -960,7 +1029,6 @@ class DBCachedConn(object): self.conn = None self.conn_time = 0 self.max_age = max_age - self.autocommit = -1 self.isolation_level = -1 self.verbose = verbose self.setup_func = setup_func |