diff options
| author | martinko | 2014-01-28 13:56:29 +0000 |
|---|---|---|
| committer | martinko | 2014-01-28 13:56:29 +0000 |
| commit | 839fcc4e34fdf682e65b36102b52281e537f647a (patch) | |
| tree | 9a47291e7776a73d874719e00e8aefa0f104d03a | |
| parent | 79a31496d6e814a2221329ac9106b6481feafb56 (diff) | |
| parent | 247255a9726b82082a948095a8f1c518ae1e00d4 (diff) | |
Merge branch 'feature/retriable_execute' into develop
| -rw-r--r-- | python/skytools/scripting.py | 58 | ||||
| -rwxr-xr-x | scripts/simple_local_consumer.py | 4 |
2 files changed, 55 insertions, 7 deletions
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 840f3cf4..37f25cac 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -3,9 +3,18 @@ """ -import sys, os, signal, optparse, time, errno, select -import logging, logging.handlers, logging.config - +import errno +import logging +import logging.config +import logging.handlers +import optparse +import os +import select +import signal +import sys +import time + +import psycopg2 import skytools import skytools.skylog @@ -921,6 +930,48 @@ class DBScript(BaseScript): # error is already logged sys.exit(1) + def execute_with_retry (self, dbname, stmt, args, exceptions = None): + """ Execute SQL and retry if it fails. + Return number of retries and current valid cursor, or raise an exception. + """ + sql_retry = self.cf.getbool("sql_retry", False) + sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) + sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) + sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) + sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) + sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) + elist = exceptions or (psycopg2.OperationalError,) + stime = time.time() + tried = 0 + dbc = None + while True: + try: + if dbc is None: + if dbname not in self.db_cache: + self.get_database(dbname, autocommit=1) + dbc = self.db_cache[dbname] + if dbc.isolation_level != skytools.I_AUTOCOMMIT: + raise skytools.UsageError ("execute_with_retry: autocommit required") + else: + dbc.reset() + curs = dbc.get_connection(dbc.isolation_level).cursor() + curs.execute (stmt, args) + break + except elist, e: + if not sql_retry or tried >= sql_retry_max_count or time.time() - stime >= sql_retry_max_time: + raise + self.log.info("Job %s got error on connection %s: %s" % (self.job_name, dbname, e)) + except: + raise + # y = a + bx , apply cap + y = sql_retry_formula_a + sql_retry_formula_b * tried + if sql_retry_formula_cap is not None and y > sql_retry_formula_cap: + y = sql_retry_formula_cap + tried += 1 + self.log.info("Retry #%i in %i seconds ...", tried, y) + self.sleep(y) + return tried, curs + def listen(self, dbname, channel): """Make connection listen for specific event channel. @@ -960,7 +1011,6 @@ class DBCachedConn(object): self.conn = None self.conn_time = 0 self.max_age = max_age - self.autocommit = -1 self.isolation_level = -1 self.verbose = verbose self.setup_func = setup_func diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py index 87c65868..50177097 100755 --- a/scripts/simple_local_consumer.py +++ b/scripts/simple_local_consumer.py @@ -38,8 +38,6 @@ class SimpleLocalConsumer(pgq.LocalConsumer): self.consumer_filter = self.cf.get("consumer_filter", "") def process_local_event(self, db, batch_id, ev): - curs = self.get_database('dst_db', autocommit = 1).cursor() - if ev.ev_type[:2] not in ('I:', 'U:', 'D:'): return @@ -59,7 +57,7 @@ class SimpleLocalConsumer(pgq.LocalConsumer): payload['pgq.ev_extra4'] = ev.ev_extra4 self.log.debug(self.dst_query, payload) - curs.execute(self.dst_query, payload) + retries, curs = self.execute_with_retry('dst_db', self.dst_query, payload) if curs.statusmessage[:6] == 'SELECT': res = curs.fetchall() self.log.debug(res) |
