From 42c4894666a2baf45493bad78728ec6568b694b9 Mon Sep 17 00:00:00 2001 From: martinko Date: Wed, 18 Dec 2013 17:16:43 +0100 Subject: simple retriable execute (for sql) the idea is that db calls occasionally break due to transient issues and we’d rather have the calls retried before raising exceptions etc --- python/skytools/scripting.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) (limited to 'python/skytools/scripting.py') diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 840f3cf4..0d53da71 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -921,6 +921,34 @@ class DBScript(BaseScript): # error is already logged sys.exit(1) + def execute_with_retry (self, cursor, stmt, args, exceptions = None): + """ Execute SQL and retry if it fails. + """ + self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) + self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) + self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) + self.sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) + self.sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) + import psycopg2 + elist = exceptions or (psycopg2.OperationalError,) + stime = time.time() + tried = 0 + while True: + try: + cursor.execute (stmt, args) + break + except elist: + if tried >= self.sql_retry_max_count or time.time() - stime >= self.sql_retry_max_time: + raise + except: + raise + # y = a + bx , apply cap + y = self.sql_retry_formula_a + self.sql_retry_formula_b * tried + if self.sql_retry_formula_cap is not None and y > self.sql_retry_formula_cap: + y = self.sql_retry_formula_cap + self.sleep(y) + tried += 1 + def listen(self, dbname, channel): """Make connection listen for specific event channel. -- cgit v1.2.3 From d54b1c556ae37f486911fff542536da1c8a34aba Mon Sep 17 00:00:00 2001 From: martinko Date: Wed, 8 Jan 2014 12:54:08 +0100 Subject: another iteration from past --- python/skytools/scripting.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'python/skytools/scripting.py') diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 0d53da71..1e86476b 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -921,7 +921,7 @@ class DBScript(BaseScript): # error is already logged sys.exit(1) - def execute_with_retry (self, cursor, stmt, args, exceptions = None): + def execute_with_retry (self, conn_or_curs, stmt, args, exceptions = None): """ Execute SQL and retry if it fails. """ self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) @@ -935,7 +935,14 @@ class DBScript(BaseScript): tried = 0 while True: try: - cursor.execute (stmt, args) + if tried > 0: + conn = getattr(conn_or_curs, 'connection', conn_or_curs) + self.close_database(conn.my_name) + conn = self.get_database(conn.my_name, conn.autocommit, conn.isolation_level) # , connstr = conn.dsn + curs = conn.cursor() + else: + curs = conn_or_curs if hasattr(conn_or_curs, 'connection') else conn_or_curs.cursor() + curs.execute (stmt, args) break except elist: if tried >= self.sql_retry_max_count or time.time() - stime >= self.sql_retry_max_time: -- cgit v1.2.3 From 136970b586834511f335c1ebc359a86058b07676 Mon Sep 17 00:00:00 2001 From: martinko Date: Wed, 8 Jan 2014 16:00:13 +0100 Subject: made more robust, returns retry count and valid cursor --- python/skytools/scripting.py | 14 +++++++------- scripts/simple_local_consumer.py | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) (limited to 'python/skytools/scripting.py') diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 1e86476b..63c16c15 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -921,9 +921,12 @@ class DBScript(BaseScript): # error is already logged sys.exit(1) - def execute_with_retry (self, conn_or_curs, stmt, args, exceptions = None): + def execute_with_retry (self, dbname, stmt, args, exceptions = None): """ Execute SQL and retry if it fails. + Note that dbname must have been used with get_database() just before. """ + dbc = self.db_cache[dbname] + assert dbc.isolation_level == 0 # autocommit self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) @@ -936,12 +939,8 @@ class DBScript(BaseScript): while True: try: if tried > 0: - conn = getattr(conn_or_curs, 'connection', conn_or_curs) - self.close_database(conn.my_name) - conn = self.get_database(conn.my_name, conn.autocommit, conn.isolation_level) # , connstr = conn.dsn - curs = conn.cursor() - else: - curs = conn_or_curs if hasattr(conn_or_curs, 'connection') else conn_or_curs.cursor() + dbc.reset() + curs = dbc.get_connection(dbc.isolation_level).cursor() curs.execute (stmt, args) break except elist: @@ -955,6 +954,7 @@ class DBScript(BaseScript): y = self.sql_retry_formula_cap self.sleep(y) tried += 1 + return tried, curs def listen(self, dbname, channel): """Make connection listen for specific event channel. diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py index f6951852..751fd877 100755 --- a/scripts/simple_local_consumer.py +++ b/scripts/simple_local_consumer.py @@ -38,7 +38,8 @@ class SimpleLocalConsumer(pgq.LocalConsumer): self.consumer_filter = self.cf.get("consumer_filter", "") def process_local_event(self, db, batch_id, ev): - curs = self.get_database('dst_db', autocommit = 1).cursor() + # initiate db connection + self.get_database('dst_db', autocommit = 1) if ev.ev_type[:2] not in ('I:', 'U:', 'D:'): return @@ -59,7 +60,7 @@ class SimpleLocalConsumer(pgq.LocalConsumer): payload['pgq.ev_extra4'] = ev.ev_extra4 self.log.debug(self.dst_query, payload) - self.execute_with_retry(curs, self.dst_query, payload) + retries, curs = self.execute_with_retry('dst_db', self.dst_query, payload) if curs.statusmessage[:6] == 'SELECT': res = curs.fetchall() self.log.debug(res) -- cgit v1.2.3 From 247255a9726b82082a948095a8f1c518ae1e00d4 Mon Sep 17 00:00:00 2001 From: martinko Date: Fri, 10 Jan 2014 18:37:22 +0100 Subject: cleaned up, even more robust, added logging, feature inactive by default --- python/skytools/scripting.py | 55 +++++++++++++++++++++++++--------------- scripts/simple_local_consumer.py | 3 --- 2 files changed, 35 insertions(+), 23 deletions(-) (limited to 'python/skytools/scripting.py') diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 63c16c15..37f25cac 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -3,9 +3,18 @@ """ -import sys, os, signal, optparse, time, errno, select -import logging, logging.handlers, logging.config - +import errno +import logging +import logging.config +import logging.handlers +import optparse +import os +import select +import signal +import sys +import time + +import psycopg2 import skytools import skytools.skylog @@ -923,37 +932,44 @@ class DBScript(BaseScript): def execute_with_retry (self, dbname, stmt, args, exceptions = None): """ Execute SQL and retry if it fails. - Note that dbname must have been used with get_database() just before. + Return number of retries and current valid cursor, or raise an exception. """ - dbc = self.db_cache[dbname] - assert dbc.isolation_level == 0 # autocommit - self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) - self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) - self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) - self.sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) - self.sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) - import psycopg2 + sql_retry = self.cf.getbool("sql_retry", False) + sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10) + sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300) + sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1) + sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5) + sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60) elist = exceptions or (psycopg2.OperationalError,) stime = time.time() tried = 0 + dbc = None while True: try: - if tried > 0: + if dbc is None: + if dbname not in self.db_cache: + self.get_database(dbname, autocommit=1) + dbc = self.db_cache[dbname] + if dbc.isolation_level != skytools.I_AUTOCOMMIT: + raise skytools.UsageError ("execute_with_retry: autocommit required") + else: dbc.reset() curs = dbc.get_connection(dbc.isolation_level).cursor() curs.execute (stmt, args) break - except elist: - if tried >= self.sql_retry_max_count or time.time() - stime >= self.sql_retry_max_time: + except elist, e: + if not sql_retry or tried >= sql_retry_max_count or time.time() - stime >= sql_retry_max_time: raise + self.log.info("Job %s got error on connection %s: %s" % (self.job_name, dbname, e)) except: raise # y = a + bx , apply cap - y = self.sql_retry_formula_a + self.sql_retry_formula_b * tried - if self.sql_retry_formula_cap is not None and y > self.sql_retry_formula_cap: - y = self.sql_retry_formula_cap - self.sleep(y) + y = sql_retry_formula_a + sql_retry_formula_b * tried + if sql_retry_formula_cap is not None and y > sql_retry_formula_cap: + y = sql_retry_formula_cap tried += 1 + self.log.info("Retry #%i in %i seconds ...", tried, y) + self.sleep(y) return tried, curs def listen(self, dbname, channel): @@ -995,7 +1011,6 @@ class DBCachedConn(object): self.conn = None self.conn_time = 0 self.max_age = max_age - self.autocommit = -1 self.isolation_level = -1 self.verbose = verbose self.setup_func = setup_func diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py index 751fd877..50177097 100755 --- a/scripts/simple_local_consumer.py +++ b/scripts/simple_local_consumer.py @@ -38,9 +38,6 @@ class SimpleLocalConsumer(pgq.LocalConsumer): self.consumer_filter = self.cf.get("consumer_filter", "") def process_local_event(self, db, batch_id, ev): - # initiate db connection - self.get_database('dst_db', autocommit = 1) - if ev.ev_type[:2] not in ('I:', 'U:', 'D:'): return -- cgit v1.2.3