made more robust, returns retry count and valid cursor
authormartinko <gamato@users.sf.net>
Wed, 8 Jan 2014 15:00:13 +0000 (16:00 +0100)
committermartinko <gamato@users.sf.net>
Tue, 28 Jan 2014 13:56:29 +0000 (14:56 +0100)
python/skytools/scripting.py
scripts/simple_local_consumer.py

index 1e86476be9c666a62d861373e5819174dc1174e8..63c16c158c78d260853e23e593737ba6e6871cf5 100644 (file)
@@ -921,9 +921,12 @@ class DBScript(BaseScript):
             # error is already logged
             sys.exit(1)
 
-    def execute_with_retry (self, conn_or_curs, stmt, args, exceptions = None):
+    def execute_with_retry (self, dbname, stmt, args, exceptions = None):
         """ Execute SQL and retry if it fails.
+        Note that dbname must have been used with get_database() just before.
         """
+        dbc = self.db_cache[dbname]
+        assert dbc.isolation_level == 0 # autocommit
         self.sql_retry_max_count = self.cf.getint("sql_retry_max_count", 10)
         self.sql_retry_max_time = self.cf.getint("sql_retry_max_time", 300)
         self.sql_retry_formula_a = self.cf.getint("sql_retry_formula_a", 1)
@@ -936,12 +939,8 @@ class DBScript(BaseScript):
         while True:
             try:
                 if tried > 0:
-                    conn = getattr(conn_or_curs, 'connection', conn_or_curs)
-                    self.close_database(conn.my_name)
-                    conn = self.get_database(conn.my_name, conn.autocommit, conn.isolation_level) # , connstr = conn.dsn
-                    curs = conn.cursor()
-                else:
-                    curs = conn_or_curs if hasattr(conn_or_curs, 'connection') else conn_or_curs.cursor()
+                    dbc.reset()
+                curs = dbc.get_connection(dbc.isolation_level).cursor()
                 curs.execute (stmt, args)
                 break
             except elist:
@@ -955,6 +954,7 @@ class DBScript(BaseScript):
                 y = self.sql_retry_formula_cap
             self.sleep(y)
             tried += 1
+        return tried, curs
 
     def listen(self, dbname, channel):
         """Make connection listen for specific event channel.
index f6951852847495842425e28d44169644c2fd6c2e..751fd877279b5c0c6b7a273bb555a7256c7d006b 100755 (executable)
@@ -38,7 +38,8 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
             self.consumer_filter = self.cf.get("consumer_filter", "")
 
     def process_local_event(self, db, batch_id, ev):
-        curs = self.get_database('dst_db', autocommit = 1).cursor()
+        # initiate db connection
+        self.get_database('dst_db', autocommit = 1)
 
         if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):
             return
@@ -59,7 +60,7 @@ class SimpleLocalConsumer(pgq.LocalConsumer):
         payload['pgq.ev_extra4'] = ev.ev_extra4
 
         self.log.debug(self.dst_query, payload)
-        self.execute_with_retry(curs, self.dst_query, payload)
+        retries, curs = self.execute_with_retry('dst_db', self.dst_query, payload)
         if curs.statusmessage[:6] == 'SELECT':
             res = curs.fetchall()
             self.log.debug(res)