diff options
author | martinko | 2013-02-28 14:03:42 +0000 |
---|---|---|
committer | martinko | 2013-02-28 14:03:42 +0000 |
commit | 24422ed401cf52fb107ca1cf6b45969ba9aeb8f3 (patch) | |
tree | 2ea614a092d8237798e6a795e9b8fa1be29ece45 /scripts/data_maintainer.py | |
parent | 29737ce4a94a893d34d1ce1f652bf961bdcd3ee6 (diff) |
scripts/data_maintainer.py: updated
Diffstat (limited to 'scripts/data_maintainer.py')
-rw-r--r-- | scripts/data_maintainer.py | 164 |
1 files changed, 93 insertions, 71 deletions
diff --git a/scripts/data_maintainer.py b/scripts/data_maintainer.py index 05b4cd4a..617960d4 100644 --- a/scripts/data_maintainer.py +++ b/scripts/data_maintainer.py @@ -1,12 +1,13 @@ -#!/usr/bin/python +#!/usr/bin/env python -"""Generic script for processing large data sets in small batches +"""Generic script for processing large data sets in small batches. -Reads event from one datasource and commits them over other one either one by one or in batches +Reads events from one datasource and commits them into another one, +either one by one or in batches. Config template:: - [simple_serial_consumer] + [data_maintainer] job_name = dm_remove_expired_services dbread = dbname=sourcedb_test @@ -14,20 +15,21 @@ Config template:: dbbefore = dbname=destdb_test dbafter = dbname=destdb_test dbcrash = dbname=destdb_test - - sql_get_pk_list = - select username - from user_service + + sql_get_pk_list = + select username + from user_service where expire_date < now(); - # it good practice to include same where condition on target side as on read side - # to ensure that you are actually changeing the same data you think you are - # especially when reading from replica database or when processing takes days - sql_modify = - delete from user_service - where username = %%(username)s + + # It is a good practice to include same where condition on target side as on read side, + # to ensure that you are actually changing the same data you think you are, + # especially when reading from replica database or when processing takes days. + sql_modify = + delete from user_service + where username = %%(username)s and expire_date < now(); - # This will be run before executing the sql_get_pk_list query + # This will be run before executing the sql_get_pk_list query (optional) sql_before_run = select * from somefunction1(%(job_name)s); @@ -39,23 +41,25 @@ Config template:: after_zero_rows = 1 # This will be run if the DM crashes (optional) - sql_on_crash = + sql_on_crash = select * from somefunction3(%(job_name)s); - # materialize query so that transaction should not be open while processing it + # materialize query so that transaction should not be open while processing it #with_hold = 1 - # how many records process to fetch at once and if batch processing is used then + + # how many records process to fetch at once and if batch processing is used then # also how many records are processed in one commit #fetch_count = 100 + # by default commit after each row (safe when behind plproxy, bouncer or whatever) - #can be toruned off for better berformance when connected directly to database + # can be turned off for better performance when connected directly to database #autocommit = 1 - logfile = ~/log/%(job_name)s.log - pidfile = ~/pid/%(job_name)s.pid - # just for tuning to throttle how much load we let onto write database + + # just for tuning to throttle how much load we let onto write database #commit_delay = 0.0 - # quite often data_maintainer is run from crontab and then loop dalay is not needed - # in case it has to be run as daemon set loop delay in seconds + + # quite often data_maintainer is run from crontab and then loop delay is not needed + # in case it has to be run as daemon set loop delay in seconds #loop_delay = 1 logfile = ~/log/%(job_name)s.log @@ -63,38 +67,57 @@ Config template:: use_skylog = 0 """ -import time, datetime, skytools, sys -skytools.sane_config = 1 +import datetime +import sys +import time + +import pkgloader +pkgloader.require('skytools', '3.0') +import skytools + + +class DataMaintainer (skytools.DBScript): + __doc__ = __doc__ + + def __init__(self, args): + super(DataMaintainer, self).__init__("data_maintainer", args) -class DataMaintainer(skytools.DBScript): - def __init__(self,args): - skytools.DBScript.__init__(self, "data_maintainer", args) # query for fetching the PK-s of the data set to be maintained self.sql_pk = self.cf.get("sql_get_pk_list") - # query for changing data tuple ( autocommit ) + + # query for changing data tuple ( autocommit ) self.sql_modify = self.cf.get("sql_modify") - # query to be run before starting the data maintainer, useful for retrieving - # initialization parameters of the query - self.sql_before = self.cf.get("sql_before_run","") + + # query to be run before starting the data maintainer, + # useful for retrieving initialization parameters of the query + self.sql_before = self.cf.get("sql_before_run", "") + # query to be run after finishing the data maintainer - self.sql_after = self.cf.get("sql_after_run","") + self.sql_after = self.cf.get("sql_after_run", "") + # whether to run the sql_after query in case of 0 rows - self.after_zero_rows = self.cf.getint("after_zero_rows",1) + self.after_zero_rows = self.cf.getint("after_zero_rows", 1) + # query to be run if the process crashes - self.sql_crash = self.cf.get("sql_on_crash","") + self.sql_crash = self.cf.get("sql_on_crash", "") + # how many records to fetch at once self.fetchcnt = self.cf.getint("fetchcnt", 100) self.fetchcnt = self.cf.getint("fetch_count", self.fetchcnt) - # specifies if nontrasactional cursor should be created (defaul 0-without hold) + + # specifies if non-transactional cursor should be created (0 -> without hold) self.withhold = self.cf.getint("with_hold", 1) - # execution mode (0 -> whole batch is commited / 1 -> autocommit) + + # execution mode (0 -> whole batch is committed / 1 -> autocommit) self.autocommit = self.cf.getint("autocommit", 1) + # delay in seconds after each commit self.commit_delay = self.cf.getfloat("commit_delay", 0.0) + # if loop delay given then we are in looping mode otherwise single loop - if self.cf.get('loop_delay',-1) == -1: + if self.cf.get('loop_delay', -1) == -1: self.set_single_loop(1) - + def work(self): self.log.info('Starting..') started = lap_time = time.time() @@ -104,17 +127,17 @@ class DataMaintainer(skytools.DBScript): if self.sql_before: bdb = self.get_database("dbbefore", autocommit=1) bcur = bdb.cursor() - bcur.execute( self.sql_before ) - if bcur.statusmessage[:6] == 'SELECT': + bcur.execute(self.sql_before) + if bcur.statusmessage.startswith('SELECT'): res = bcur.dictfetchall() assert len(res)==1, "Result of a 'before' query must be 1 row" bres = res[0].copy() - + if self.autocommit: - self.log.info('Autocommit after each modify') + self.log.info("Autocommit after each modify") dbw = self.get_database("dbwrite", autocommit=1) else: - self.log.info('Commit in %s record batches' % self.fetchcnt) + self.log.info("Commit in %i record batches", self.fetchcnt) dbw = self.get_database("dbwrite", autocommit=0) if self.withhold: dbr = self.get_database("dbread", autocommit=1) @@ -124,10 +147,11 @@ class DataMaintainer(skytools.DBScript): sql = "DECLARE data_maint_cur NO SCROLL CURSOR FOR %s" rcur = dbr.cursor() mcur = dbw.cursor() - rcur.execute(sql % self.sql_pk, bres) # Pass the results from the before_query into sql_pk + rcur.execute(sql % self.sql_pk, bres) # pass results from before_query into sql_pk self.log.debug(rcur.query) self.log.debug(rcur.statusmessage) - while True: # loop while fetch returns fetchcount rows + + while True: # loop while fetch returns fetch_count rows self.fetch_started = time.time() rcur.execute("FETCH FORWARD %s FROM data_maint_cur" % self.fetchcnt) self.log.debug(rcur.query) @@ -144,22 +168,20 @@ class DataMaintainer(skytools.DBScript): if self.looping == 0: self.log.info("Exiting on user request") break - if time.time() - lap_time > 60.0: # if one minute has passed print running totals - self.log.info("--- Running count: %s duration: %s ---" % - (total_count, str(datetime.timedelta(0, round(time.time() - started))))) + if time.time() - lap_time > 60.0: # if one minute has passed print running totals + self.log.info("--- Running count: %s duration: %s ---", + total_count, datetime.timedelta(0, round(time.time() - started))) lap_time = time.time() rcur.execute("CLOSE data_maint_cur") - if not self.withhold: dbr.rollback() - self.log.info("--- Total count: %s duration: %s ---" % - (total_count, str(datetime.timedelta(0, round(time.time() - started))))) - - # Run sql_after + self.log.info("--- Total count: %s duration: %s ---", + total_count, datetime.timedelta(0, round(time.time() - started))) + if self.sql_after and (self.after_zero_rows > 0 or total_count > 0): - dba = self.get_database("dbafter", autocommit=1) - acur = dba.cursor() + adb = self.get_database("dbafter", autocommit=1) + acur = adb.cursor() # FIXME: neither of those can be None? if bres != None and lastitem != None: @@ -174,35 +196,35 @@ class DataMaintainer(skytools.DBScript): """ Process events in autocommit mode reading results back and trying to make some sense out of them """ try: - count = 0 + count = 0 item = bres for i in res: # for each row in read query result item.update(i) mcur.execute(self.sql_modify, item) - if 'cnt' in item: - count += item['cnt'] - self.stat_increase("count", item['cnt']) - else: - count += 1 - self.stat_increase("count") self.log.debug(mcur.query) - if mcur.statusmessage[:6] == 'SELECT': # if select was usedd we can expect some result + if mcur.statusmessage.startswith('SELECT'): # if select was used we can expect some result mres = mcur.dictfetchall() - for r in mres: - if 'stats' in r: # if specially handled column stats is present - for k, v in skytools.db_urldecode(r['stats']).items(): - self.stat_increase(k,int(v)) + for r in mres: + if 'stats' in r: # if specially handled column 'stats' is present + for k, v in skytools.db_urldecode(r['stats']).items(): + self.stat_increase(k, int(v)) self.log.debug(r) else: self.stat_increase('processed', mcur.rowcount) self.log.debug(mcur.statusmessage) + if 'cnt' in item: + count += item['cnt'] + self.stat_increase("count", item['cnt']) + else: + count += 1 + self.stat_increase("count") if self.looping == 0: break if self.commit_delay > 0.0: time.sleep(self.commit_delay) return count, item - except: # The process has crashed, run the sql_crash and reraise the exception - if self.sql_crash != "": + except: # process has crashed, run sql_crash and re-raise the exception + if self.sql_crash: dbc = self.get_database("dbcrash", autocommit=1) ccur = dbc.cursor() ccur.execute(self.sql_crash, item) |