cleaned up, even more robust, added logging, feature inactive by default
authormartinko <gamato@users.sf.net>
Fri, 10 Jan 2014 17:37:22 +0000 (18:37 +0100)
committermartinko <gamato@users.sf.net>
Tue, 28 Jan 2014 13:56:29 +0000 (14:56 +0100)
python/skytools/scripting.py
scripts/simple_local_consumer.py

index 63c16c158c78d260853e23e593737ba6e6871cf5..37f25caca41f7c0406099fe6ae0956e4df92ed62 100644 (file)
@@ -3,9 +3,18 @@
 
 """
 
-import sys, os, signal, optparse, time, errno, select
-import logging, logging.handlers, logging.config
-
+import errno
+import logging
+import logging.config
+import logging.handlers
+import optparse
+import os
+import select
+import signal
+import sys
+import time
+
+import psycopg2
 import skytools
 import skytools.skylog
 
@@ -923,37 +932,44 @@ class DBScript(BaseScript):
 
     def execute_with_retry (self, dbname, stmt, args, exceptions = None):
         """ Execute SQL and retry if it fails.
-        Note that dbname must have been used with get_database() just before.
+        Return number of retries and current valid cursor, or raise an exception.
         """
-        dbc = self.db_cache[dbname]
-        assert dbc.isolation_level == 0 # autocommit
-        self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10)
-        self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300)
-        self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
-        self.sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5)
-        self.sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60)
-        import psycopg2
+        sql_retry = self.cf.getbool("sql_retry", False)
+        sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10)
+        sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300)
+        sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
+        sql_retry_formula_b = self.cf.getint("sql_retry_formula_b", 5)
+        sql_retry_formula_cap = self.cf.getint("sql_retry_formula_cap", 60)
         elist = exceptions or (psycopg2.OperationalError,)
         stime = time.time()
         tried = 0
+        dbc = None
         while True:
             try:
-                if tried > 0:
+                if dbc is None:
+                    if dbname not in self.db_cache:
+                        self.get_database(dbname, autocommit=1)
+                    dbc = self.db_cache[dbname]
+                    if dbc.isolation_level != skytools.I_AUTOCOMMIT:
+                        raise skytools.UsageError ("execute_with_retry: autocommit required")
+                else:
                     dbc.reset()
                 curs = dbc.get_connection(dbc.isolation_level).cursor()
                 curs.execute (stmt, args)
                 break
-            except elist:
-                if tried >= self.sql_retry_max_count or time.time() - stime >= self.sql_retry_max_time:
+            except elist, e:
+                if not sql_retry or tried >= sql_retry_max_count or time.time() - stime >= sql_retry_max_time:
                     raise
+                self.log.info("Job %s got error on connection %s: %s" % (self.job_name, dbname, e))
             except:
                 raise
             # y = a + bx , apply cap
-            y = self.sql_retry_formula_a + self.sql_retry_formula_b * tried
-            if self.sql_retry_formula_cap is not None and y > self.sql_retry_formula_cap:
-                y = self.sql_retry_formula_cap
-            self.sleep(y)
+            y = sql_retry_formula_a + sql_retry_formula_b * tried
+            if sql_retry_formula_cap is not None and y > sql_retry_formula_cap:
+                y = sql_retry_formula_cap
             tried += 1
+            self.log.info("Retry #%i in %i seconds ...", tried, y)
+            self.sleep(y)
         return tried, curs
 
     def listen(self, dbname, channel):
@@ -995,7 +1011,6 @@ class DBCachedConn(object):
         self.conn = None
         self.conn_time = 0
         self.max_age = max_age
-        self.autocommit = -1
         self.isolation_level = -1
         self.verbose = verbose
         self.setup_func = setup_func
index 751fd877279b5c0c6b7a273bb555a7256c7d006b..501770970ae78b90af8096615848ef5481e52cdf 100755 (executable)
@@ -38,9 +38,6 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
             self.consumer_filter = self.cf.get("consumer_filter", "")
 
     def process_local_event(self, db, batch_id, ev):
-        # initiate db connection
-        self.get_database('dst_db', autocommit = 1)
-
         if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):
             return