summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormartinko2014-01-08 15:00:13 +0000
committermartinko2014-01-28 13:56:29 +0000
commit136970b586834511f335c1ebc359a86058b07676 (patch)
tree0febada4b9ff29d42c52dee866f8fb729ff79154
parentd54b1c556ae37f486911fff542536da1c8a34aba (diff)
made more robust, returns retry count and valid cursor
-rw-r--r--python/skytools/scripting.py14
-rwxr-xr-xscripts/simple_local_consumer.py5
2 files changed, 10 insertions, 9 deletions
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index 1e86476b..63c16c15 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -921,9 +921,12 @@ class DBScript(BaseScript):
# error is already logged
sys.exit(1)
- def execute_with_retry (self, conn_or_curs, stmt, args, exceptions = None):
+ def execute_with_retry (self, dbname, stmt, args, exceptions = None):
""" Execute SQL and retry if it fails.
+ Note that dbname must have been used with get_database() just before.
"""
+ dbc = self.db_cache[dbname]
+ assert dbc.isolation_level == 0 # autocommit
self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10)
self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300)
self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
@@ -936,12 +939,8 @@ class DBScript(BaseScript):
while True:
try:
if tried > 0:
- conn = getattr(conn_or_curs, 'connection', conn_or_curs)
- self.close_database(conn.my_name)
- conn = self.get_database(conn.my_name, conn.autocommit, conn.isolation_level) # , connstr = conn.dsn
- curs = conn.cursor()
- else:
- curs = conn_or_curs if hasattr(conn_or_curs, 'connection') else conn_or_curs.cursor()
+ dbc.reset()
+ curs = dbc.get_connection(dbc.isolation_level).cursor()
curs.execute (stmt, args)
break
except elist:
@@ -955,6 +954,7 @@ class DBScript(BaseScript):
y = self.sql_retry_formula_cap
self.sleep(y)
tried += 1
+ return tried, curs
def listen(self, dbname, channel):
"""Make connection listen for specific event channel.
diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py
index f6951852..751fd877 100755
--- a/scripts/simple_local_consumer.py
+++ b/scripts/simple_local_consumer.py
@@ -38,7 +38,8 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
self.consumer_filter = self.cf.get("consumer_filter", "")
def process_local_event(self, db, batch_id, ev):
- curs = self.get_database('dst_db', autocommit = 1).cursor()
+ # initiate db connection
+ self.get_database('dst_db', autocommit = 1)
if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):
return
@@ -59,7 +60,7 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
payload['pgq.ev_extra4'] = ev.ev_extra4
self.log.debug(self.dst_query, payload)
- self.execute_with_retry(curs, self.dst_query, payload)
+ retries, curs = self.execute_with_retry('dst_db', self.dst_query, payload)
if curs.statusmessage[:6] == 'SELECT':
res = curs.fetchall()
self.log.debug(res)