diff options
Diffstat (limited to 'python/pgq/maint.py')
-rw-r--r-- | python/pgq/maint.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/python/pgq/maint.py b/python/pgq/maint.py new file mode 100644 index 00000000..4636f74f --- /dev/null +++ b/python/pgq/maint.py @@ -0,0 +1,99 @@ +"""PgQ maintenance functions.""" + +import skytools, time + +def get_pgq_api_version(curs): + q = "select count(1) from pg_proc p, pg_namespace n"\ + " where n.oid = p.pronamespace and n.nspname='pgq'"\ + " and p.proname='version';" + curs.execute(q) + if not curs.fetchone()[0]: + return '1.0.0' + + curs.execute("select pgq.version()") + return curs.fetchone()[0] + +def version_ge(curs, want_ver): + """Check is db version of pgq is greater than want_ver.""" + db_ver = get_pgq_api_version(curs) + want_tuple = map(int, want_ver.split('.')) + db_tuple = map(int, db_ver.split('.')) + if db_tuple[0] != want_tuple[0]: + raise Exception('Wrong major version') + if db_tuple[1] >= want_tuple[1]: + return 1 + return 0 + +class MaintenanceJob(skytools.DBScript): + """Periodic maintenance.""" + def __init__(self, ticker, args): + skytools.DBScript.__init__(self, 'pgqadm', args) + self.ticker = ticker + self.last_time = 0 # start immidiately + self.last_ticks = 0 + self.clean_ticks = 1 + self.maint_delay = 5*60 + + def startup(self): + # disable regular DBScript startup() + pass + + def reload(self): + skytools.DBScript.reload(self) + + # force loop_delay + self.loop_delay = 5 + + self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', 5) + self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay) + + def work(self): + t = time.time() + if self.last_time + self.maint_delay > t: + return + + self.do_maintenance() + + self.last_time = t + duration = time.time() - t + self.stat_add('maint_duration', duration) + + def do_maintenance(self): + """Helper function for running maintenance.""" + + db = self.get_database('db', autocommit=1) + cx = db.cursor() + + if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1): + # rotate each queue in own TX + q = "select queue_name from pgq.get_queue_info()" + cx.execute(q) + for row in cx.fetchall(): + cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]]) + res = cx.fetchone()[0] + if res: + self.log.info('Rotating %s' % row[0]) + else: + cx.execute("select pgq.maint_rotate_tables_step1();") + + # finish rotation + cx.execute("select pgq.maint_rotate_tables_step2();") + + # move retry events to main queue in small blocks + rcount = 0 + while 1: + cx.execute('select pgq.maint_retry_events();') + res = cx.fetchone()[0] + rcount += res + if res == 0: + break + if rcount: + self.log.info('Got %d events for retry' % rcount) + + # vacuum tables that are needed + cx.execute('set maintenance_work_mem = 32768') + cx.execute('select * from pgq.maint_tables_to_vacuum()') + for row in cx.fetchall(): + cx.execute('vacuum %s;' % row[0]) + + |