scripts/data_maintainer.py: updated
authormartinko <gamato@users.sf.net>
Thu, 28 Feb 2013 14:03:42 +0000 (15:03 +0100)
committermartinko <gamato@users.sf.net>
Thu, 28 Feb 2013 14:03:42 +0000 (15:03 +0100)
scripts/data_maintainer.py

index 05b4cd4ab0eb8015c574e2d0d995907ff6fe0050..617960d48b397e516c58181f2ef49b07929d4882 100644 (file)
@@ -1,12 +1,13 @@
-#!/usr/bin/python
+#!/usr/bin/env python
 
-"""Generic script for processing large data sets in small batches
+"""Generic script for processing large data sets in small batches.
 
-Reads event from one datasource and commits them over other one either one by one or in batches
+Reads events from one datasource and commits them into another one,
+either one by one or in batches.
 
 Config template::
 
-    [simple_serial_consumer]
+    [data_maintainer]
     job_name        = dm_remove_expired_services
 
     dbread          = dbname=sourcedb_test
@@ -14,20 +15,21 @@ Config template::
     dbbefore        = dbname=destdb_test
     dbafter         = dbname=destdb_test
     dbcrash         = dbname=destdb_test
-    
-    sql_get_pk_list = 
-        select username 
-        from user_service 
+
+    sql_get_pk_list =
+        select username
+        from user_service
         where expire_date < now();
-    # it good practice to include same where condition on target side as on read side
-    # to ensure that you are actually changeing the same data you think you are
-    # especially when reading from replica database or when processing takes days
-    sql_modify      = 
-        delete from user_service 
-        where username = %%(username)s 
+
+    # It is a good practice to include same where condition on target side as on read side,
+    # to ensure that you are actually changing the same data you think you are,
+    # especially when reading from replica database or when processing takes days.
+    sql_modify =
+        delete from user_service
+        where username = %%(username)s
         and expire_date < now();
 
-    # This will be run before executing the sql_get_pk_list query
+    # This will be run before executing the sql_get_pk_list query (optional)
     sql_before_run =
         select * from somefunction1(%(job_name)s);
 
@@ -39,23 +41,25 @@ Config template::
     after_zero_rows = 1
 
     # This will be run if the DM crashes (optional)
-    sql_on_crash = 
+    sql_on_crash =
         select * from somefunction3(%(job_name)s);
 
-    # materialize query so that transaction should not be open while processing it 
+    # materialize query so that transaction should not be open while processing it
     #with_hold       = 1
-    # how many records process to fetch at once and if batch processing is used then 
+
+    # how many records process to fetch at once and if batch processing is used then
     # also how many records are processed in one commit
     #fetch_count     = 100
+
     # by default commit after each row (safe when behind plproxy, bouncer or whatever)
-    #can be toruned off for better berformance when connected directly to database
+    # can be turned off for better performance when connected directly to database
     #autocommit      = 1
-    logfile         = ~/log/%(job_name)s.log
-    pidfile         = ~/pid/%(job_name)s.pid
-    # just for tuning to throttle how much load we let onto write database 
+
+    # just for tuning to throttle how much load we let onto write database
     #commit_delay    = 0.0
-    # quite often data_maintainer is run from crontab and then loop dalay is not needed
-    # in case it has to be run as daemon set loop delay in seconds 
+
+    # quite often data_maintainer is run from crontab and then loop delay is not needed
+    # in case it has to be run as daemon set loop delay in seconds
     #loop_delay      = 1
 
     logfile         = ~/log/%(job_name)s.log
@@ -63,38 +67,57 @@ Config template::
     use_skylog      = 0
 """
 
-import time, datetime, skytools, sys
-skytools.sane_config = 1
+import datetime
+import sys
+import time
+
+import pkgloader
+pkgloader.require('skytools', '3.0')
+import skytools
+
+
+class DataMaintainer (skytools.DBScript):
+    __doc__ = __doc__
+
+    def __init__(self, args):
+        super(DataMaintainer, self).__init__("data_maintainer", args)
 
-class DataMaintainer(skytools.DBScript):
-    def __init__(self,args):
-        skytools.DBScript.__init__(self, "data_maintainer", args)
         # query for fetching the PK-s of the data set to be maintained
         self.sql_pk = self.cf.get("sql_get_pk_list")
-        # query for changing data tuple ( autocommit ) 
+
+        # query for changing data tuple ( autocommit )
         self.sql_modify = self.cf.get("sql_modify")
-        # query to be run before starting the data maintainer, useful for retrieving
-        # initialization parameters of the query
-        self.sql_before = self.cf.get("sql_before_run","")
+
+        # query to be run before starting the data maintainer,
+        # useful for retrieving initialization parameters of the query
+        self.sql_before = self.cf.get("sql_before_run", "")
+
         # query to be run after finishing the data maintainer
-        self.sql_after = self.cf.get("sql_after_run","")
+        self.sql_after = self.cf.get("sql_after_run", "")
+
         # whether to run the sql_after query in case of 0 rows
-        self.after_zero_rows = self.cf.getint("after_zero_rows",1)
+        self.after_zero_rows = self.cf.getint("after_zero_rows", 1)
+
         # query to be run if the process crashes
-        self.sql_crash = self.cf.get("sql_on_crash","")
+        self.sql_crash = self.cf.get("sql_on_crash", "")
+
         # how many records to fetch at once
         self.fetchcnt = self.cf.getint("fetchcnt", 100)
         self.fetchcnt = self.cf.getint("fetch_count", self.fetchcnt)
-        # specifies if nontrasactional cursor should be created (defaul 0-without hold)
+
+        # specifies if non-transactional cursor should be created (0 -> without hold)
         self.withhold = self.cf.getint("with_hold", 1)
-        # execution mode (0 -> whole batch is commited / 1 -> autocommit)
+
+        # execution mode (0 -> whole batch is committed / 1 -> autocommit)
         self.autocommit = self.cf.getint("autocommit", 1)
+
         # delay in seconds after each commit
         self.commit_delay =  self.cf.getfloat("commit_delay", 0.0)
+
         # if loop delay given then we are in looping mode otherwise single loop
-        if self.cf.get('loop_delay',-1) == -1:
+        if self.cf.get('loop_delay', -1) == -1:
             self.set_single_loop(1)
-        
+
     def work(self):
         self.log.info('Starting..')
         started = lap_time = time.time()
@@ -104,17 +127,17 @@ class DataMaintainer(skytools.DBScript):
         if self.sql_before:
             bdb = self.get_database("dbbefore", autocommit=1)
             bcur = bdb.cursor()
-            bcur.execute( self.sql_before )
-            if bcur.statusmessage[:6] == 'SELECT':
+            bcur.execute(self.sql_before)
+            if bcur.statusmessage.startswith('SELECT'):
                 res = bcur.dictfetchall()
                 assert len(res)==1, "Result of a 'before' query must be 1 row"
                 bres = res[0].copy()
-       
+
         if self.autocommit:
-            self.log.info('Autocommit after each modify')
+            self.log.info("Autocommit after each modify")
             dbw = self.get_database("dbwrite", autocommit=1)
         else:
-            self.log.info('Commit in %s record batches' % self.fetchcnt)
+            self.log.info("Commit in %i record batches", self.fetchcnt)
             dbw = self.get_database("dbwrite", autocommit=0)
         if self.withhold:
             dbr = self.get_database("dbread", autocommit=1)
@@ -124,10 +147,11 @@ class DataMaintainer(skytools.DBScript):
             sql = "DECLARE data_maint_cur NO SCROLL CURSOR FOR %s"
         rcur = dbr.cursor()
         mcur = dbw.cursor()
-        rcur.execute(sql % self.sql_pk, bres) # Pass the results from the before_query into sql_pk
+        rcur.execute(sql % self.sql_pk, bres) # pass results from before_query into sql_pk
         self.log.debug(rcur.query)
         self.log.debug(rcur.statusmessage)
-        while True:    # loop while fetch returns fetchcount rows 
+
+        while True: # loop while fetch returns fetch_count rows
             self.fetch_started = time.time()
             rcur.execute("FETCH FORWARD %s FROM data_maint_cur" % self.fetchcnt)
             self.log.debug(rcur.query)
@@ -144,22 +168,20 @@ class DataMaintainer(skytools.DBScript):
             if self.looping == 0:
                 self.log.info("Exiting on user request")
                 break
-            if time.time() - lap_time > 60.0:  # if one minute has passed print running totals
-                self.log.info("--- Running count: %s duration: %s ---" % 
-                        (total_count,  str(datetime.timedelta(0, round(time.time() - started)))))
+            if time.time() - lap_time > 60.0: # if one minute has passed print running totals
+                self.log.info("--- Running count: %s duration: %s ---",
+                        total_count,  datetime.timedelta(0, round(time.time() - started)))
                 lap_time = time.time()
 
         rcur.execute("CLOSE data_maint_cur")
-
         if not self.withhold:
             dbr.rollback()
-        self.log.info("--- Total count: %s duration: %s ---" % 
-                (total_count,  str(datetime.timedelta(0, round(time.time() - started)))))
-        
-        # Run sql_after
+        self.log.info("--- Total count: %s duration: %s ---",
+                total_count,  datetime.timedelta(0, round(time.time() - started)))
+
         if self.sql_after and (self.after_zero_rows > 0 or total_count > 0):
-            dba = self.get_database("dbafter", autocommit=1)
-            acur = dba.cursor()
+            adb = self.get_database("dbafter", autocommit=1)
+            acur = adb.cursor()
 
             # FIXME: neither of those can be None?
             if bres != None and lastitem != None:
@@ -174,35 +196,35 @@ class DataMaintainer(skytools.DBScript):
         """ Process events in autocommit mode reading results back and trying to make some sense out of them
         """
         try:
-            count = 0 
+            count = 0
             item = bres
             for i in res:   # for each row in read query result
                 item.update(i)
                 mcur.execute(self.sql_modify, item)
-                if 'cnt' in item:
-                    count += item['cnt']
-                    self.stat_increase("count", item['cnt'])
-                else:
-                    count += 1
-                    self.stat_increase("count")
                 self.log.debug(mcur.query)
-                if mcur.statusmessage[:6] == 'SELECT':  # if select was usedd we can expect some result
+                if mcur.statusmessage.startswith('SELECT'): # if select was used we can expect some result
                     mres = mcur.dictfetchall()
-                    for r in mres: 
-                        if 'stats' in r: # if specially handled column stats is present
-                            for k, v in skytools.db_urldecode(r['stats']).items(): 
-                                self.stat_increase(k,int(v))
+                    for r in mres:
+                        if 'stats' in r: # if specially handled column 'stats' is present
+                            for k, v in skytools.db_urldecode(r['stats']).items():
+                                self.stat_increase(k, int(v))
                         self.log.debug(r)
                 else:
                     self.stat_increase('processed', mcur.rowcount)
                     self.log.debug(mcur.statusmessage)
+                if 'cnt' in item:
+                    count += item['cnt']
+                    self.stat_increase("count", item['cnt'])
+                else:
+                    count += 1
+                    self.stat_increase("count")
                 if self.looping == 0:
                     break
             if self.commit_delay > 0.0:
                 time.sleep(self.commit_delay)
             return count, item
-        except: # The process has crashed, run the sql_crash and reraise the exception
-            if self.sql_crash != "":
+        except: # process has crashed, run sql_crash and re-raise the exception
+            if self.sql_crash:
                 dbc = self.get_database("dbcrash", autocommit=1)
                 ccur = dbc.cursor()
                 ccur.execute(self.sql_crash, item)