summaryrefslogtreecommitdiff
path: root/python/pgq/maint.py
blob: 89014dd4cadeed5768705380d963d33f3ae96992 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""PgQ maintenance functions."""

import skytools, time

__all__ = ['MaintenanceJob']

def get_pgq_api_version(curs):
    q = "select count(1) from pg_proc p, pg_namespace n"\
        " where n.oid = p.pronamespace and n.nspname='pgq'"\
        "   and p.proname='version';"
    curs.execute(q)
    if not curs.fetchone()[0]:
        return '1.0.0'

    curs.execute("select pgq.version()")
    return curs.fetchone()[0]

def version_ge(curs, want_ver):
    """Check is db version of pgq is greater than want_ver."""
    db_ver = get_pgq_api_version(curs)
    want_tuple = map(int, want_ver.split('.'))
    db_tuple = map(int, db_ver.split('.'))
    if db_tuple[0] != want_tuple[0]:
        raise Exception('Wrong major version')
    if db_tuple[1] >= want_tuple[1]:
        return 1
    return 0

class MaintenanceJob(skytools.DBScript):
    """Periodic maintenance."""
    def __init__(self, ticker, args):
        skytools.DBScript.__init__(self, 'pgqadm', args)
        self.ticker = ticker
        self.last_time = 0 # start immidiately
        self.last_ticks = 0
        self.clean_ticks = 1
        self.maint_delay = 5*60

    def startup(self):
        # disable regular DBScript startup()
        pass

    def reload(self):
        skytools.DBScript.reload(self)

        # force loop_delay
        self.loop_delay = 5

        # compat var
        self.maint_delay = 60 * self.cf.getfloat('maint_delay_min', -1)
        if self.maint_delay < 0:
            self.maint_delay = self.cf.getfloat('maint_delay', 5*60)
        self.maint_delay = self.cf.getfloat('maint_delay', self.maint_delay)

    def work(self):
        t = time.time()
        if self.last_time + self.maint_delay > t:
            return

        self.do_maintenance()

        self.last_time = t
        duration = time.time() - t
        self.stat_put('maint_duration', duration)

    def do_maintenance(self):
        """Helper function for running maintenance."""

        db = self.get_database('db', autocommit=1)
        cx = db.cursor()

        if skytools.exists_function(cx, "pgq.maint_rotate_tables_step1", 1):
            # rotate each queue in own TX
            q = "select queue_name from pgq.get_queue_info()"
            cx.execute(q)
            for row in cx.fetchall():
                cx.execute("select pgq.maint_rotate_tables_step1(%s)", [row[0]])
                res = cx.fetchone()[0]
                if res:
                    self.log.info('Rotating %s' % row[0])
        else:
            cx.execute("select pgq.maint_rotate_tables_step1();")

        # finish rotation
        cx.execute("select pgq.maint_rotate_tables_step2();")

        # move retry events to main queue in small blocks
        rcount = 0
        while 1:
            cx.execute('select pgq.maint_retry_events();')
            res = cx.fetchone()[0]
            rcount += res
            if res == 0:
                break
        if rcount:
            self.log.info('Got %d events for retry' % rcount)

        # vacuum tables that are needed
        cx.execute('set maintenance_work_mem = 32768')
        cx.execute('select * from pgq.maint_tables_to_vacuum()')
        for row in cx.fetchall():
            cx.execute('vacuum %s;' % row[0])