diff options
author | martinko | 2012-07-24 08:50:10 +0000 |
---|---|---|
committer | martinko | 2012-07-24 08:50:10 +0000 |
commit | 5b54a246cf31cbe2a88154711a113ade73019859 (patch) | |
tree | 197d221afb089af2aa5fa0f93942f2ec40a324ca /scripts | |
parent | 60ea682083e30c62c3298b37a9024c41a6ecc3f6 (diff) | |
parent | fecd2ff72e00acb558024a402f75c5956dfbe5c1 (diff) |
Merge branch 'master' of skype-git:/git/dba/skytools-3
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/find_sql_functions.py | 103 | ||||
-rwxr-xr-x | scripts/simple_consumer.py | 36 | ||||
-rwxr-xr-x | scripts/simple_local_consumer.py | 66 | ||||
-rwxr-xr-x | scripts/skytools_upgrade.py | 11 |
4 files changed, 202 insertions, 14 deletions
diff --git a/scripts/find_sql_functions.py b/scripts/find_sql_functions.py new file mode 100755 index 00000000..55718cd8 --- /dev/null +++ b/scripts/find_sql_functions.py @@ -0,0 +1,103 @@ +#! /usr/bin/env python + +"""Find and print out function signatures from .sql file. + +Usage: + find_sql_functions.py [-h] [-s] [-p PREFIX] FILE ... + +Switches: + -h Show help + -p PREFIX Prefix each line with string + -s Check whether function is SECURITY DEFINER +""" + +import sys, re, getopt + +rx = r""" +^ +create \s+ (?: or \s+ replace \s+ )? +function ( [^(]+ ) +[(] ( [^)]* ) [)] +""" + +rx_secdef = r"""security\s+definer""" + + +rc = re.compile(rx, re.I | re.M | re.X) +sc = re.compile(r"\s+") +rc_sec = re.compile(rx_secdef, re.I | re.X) + +def grep_file(fn, cf_prefix, cf_secdef): + sql = open(fn).read() + pos = 0 + while 1: + m = rc.search(sql, pos) + if not m: + break + pos = m.end() + + m2 = rc.search(sql, pos) + if m2: + xpos = m2.end() + else: + xpos = len(sql) + secdef = False + m2 = rc_sec.search(sql, pos, xpos) + if m2: + secdef = True + + fname = m.group(1).strip() + fargs = m.group(2) + + alist = fargs.split(',') + tlist = [] + for a in alist: + a = a.strip() + toks = sc.split(a.lower()) + if toks[0] == "out": + continue + if toks[0] in ("in", "inout"): + toks = toks[1:] + # just take last item + tlist.append(toks[-1]) + + sig = "%s(%s)" % (fname, ", ".join(tlist)) + + if cf_prefix: + ln = "%s %s;" % (cf_prefix, sig) + else: + ln = " %s(%s)," % (fname, ", ".join(tlist)) + + if cf_secdef and secdef: + ln = "%-72s -- SECDEF" % (ln) + + print ln + +def main(argv): + cf_secdef = 0 + cf_prefix = '' + + try: + opts, args = getopt.getopt(argv, "hsp:") + except getopt.error, d: + print 'getopt:', d + sys.exit(1) + + for o, a in opts: + if o == '-h': + print __doc__ + sys.exit(0) + elif o == '-s': + cf_secdef = 1 + elif o == '-p': + cf_prefix = a + else: + print __doc__ + sys.exit(1) + + for fn in args: + grep_file(fn, cf_prefix, cf_secdef) + +if __name__ == '__main__': + main(sys.argv[1:]) + diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py index 109f27a5..df0db11c 100755 --- a/scripts/simple_consumer.py +++ b/scripts/simple_consumer.py @@ -3,9 +3,16 @@ """Consumer that simply calls SQL query for each event. Config:: + # source database + src_db = + + # destination database + dst_db = + # query to call dst_query = select * from somefunc(%%(pgq.ev_data)s); + ## Deprecated, use table_filter ## # filter for events (SQL fragment) consumer_filter = ev_extra1 = 'public.mytable1' """ @@ -22,10 +29,11 @@ import skytools class SimpleConsumer(pgq.Consumer): __doc__ = __doc__ - def __init__(self, args): - pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args) + def reload(self): + super(SimpleConsumer, self).reload() self.dst_query = self.cf.get("dst_query") - self.consumer_filter = self.cf.get("consumer_filter", "") + if self.cf.get("consumer_filter", ""): + self.consumer_filter = self.cf.get("consumer_filter", "") def process_event(self, db, ev): curs = self.get_database('dst_db', autocommit = 1).cursor() @@ -37,17 +45,25 @@ class SimpleConsumer(pgq.Consumer): payload = {} else: payload = skytools.db_urldecode(ev.ev_data) - payload['pgq.ev_data'] = ev.ev_data + payload['pgq.tick_id'] = self.batch_info['cur_tick_id'] + payload['pgq.ev_id'] = ev.ev_id + payload['pgq.ev_time'] = ev.ev_time payload['pgq.ev_type'] = ev.ev_type + payload['pgq.ev_data'] = ev.ev_data payload['pgq.ev_extra1'] = ev.ev_extra1 - payload['pgq.ev_time'] = ev.ev_time - - self.log.debug(self.dst_query % payload) + payload['pgq.ev_extra2'] = ev.ev_extra2 + payload['pgq.ev_extra3'] = ev.ev_extra3 + payload['pgq.ev_extra4'] = ev.ev_extra4 + + self.log.debug(self.dst_query, payload) curs.execute(self.dst_query, payload) - res = curs.fetchall() - self.log.debug(res) + if curs.statusmessage[:6] == 'SELECT': + res = curs.fetchall() + self.log.debug(res) + else: + self.log.debug(curs.statusmessage) if __name__ == '__main__': - script = SimpleConsumer(sys.argv[1:]) + script = SimpleConsumer("simple_consumer3", "src_db", sys.argv[1:]) script.start() diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py new file mode 100755 index 00000000..6e3eb601 --- /dev/null +++ b/scripts/simple_local_consumer.py @@ -0,0 +1,66 @@ +#!/usr/bin/python + +"""Consumer that simply calls SQL query for each event. + +It tracks completed batches in local file. + +Config:: + # source database + src_db = + + # destination database + dst_db = + + # query to call + dst_query = select * from somefunc(%%(pgq.ev_data)s); +""" + + +import sys + +import pkgloader +pkgloader.require('skytools', '3.0') + +import pgq +import skytools + +class SimpleLocalConsumer(pgq.LocalConsumer): + __doc__ = __doc__ + + def reload(self): + super(SimpleLocalConsumer, self).reload() + self.dst_query = self.cf.get("dst_query") + + def process_local_event(self, db, batch_id, ev): + curs = self.get_database('dst_db', autocommit = 1).cursor() + + if ev.ev_type[:2] not in ('I:', 'U:', 'D:'): + return + + if ev.ev_data is None: + payload = {} + else: + payload = skytools.db_urldecode(ev.ev_data) + + payload['pgq.tick_id'] = self.batch_info['cur_tick_id'] + payload['pgq.ev_id'] = ev.ev_id + payload['pgq.ev_time'] = ev.ev_time + payload['pgq.ev_type'] = ev.ev_type + payload['pgq.ev_data'] = ev.ev_data + payload['pgq.ev_extra1'] = ev.ev_extra1 + payload['pgq.ev_extra2'] = ev.ev_extra2 + payload['pgq.ev_extra3'] = ev.ev_extra3 + payload['pgq.ev_extra4'] = ev.ev_extra4 + + self.log.debug(self.dst_query, payload) + curs.execute(self.dst_query, payload) + if curs.statusmessage[:6] == 'SELECT': + res = curs.fetchall() + self.log.debug(res) + else: + self.log.debug(curs.statusmessage) + +if __name__ == '__main__': + script = SimpleLocalConsumer("simple_local_consumer3", "src_db", sys.argv[1:]) + script.start() + diff --git a/scripts/skytools_upgrade.py b/scripts/skytools_upgrade.py index 169a1913..5a1cefe4 100755 --- a/scripts/skytools_upgrade.py +++ b/scripts/skytools_upgrade.py @@ -43,13 +43,14 @@ def check_version(curs, schema, new_ver_str, recheck_func=None): funcname = "%s.version" % schema if not skytools.exists_function(curs, funcname, 0): if recheck_func is not None: - return recheck_func(curs) + return recheck_func(curs), 'NULL' else: - return 0 + return 0, 'NULL' q = "select %s()" % funcname curs.execute(q) old_ver_str = curs.fetchone()[0] - return is_version_ge(old_ver_str, new_ver_str) + ok = is_version_ge(old_ver_str, new_ver_str) + return ok, old_ver_str class DbUpgrade(skytools.DBScript): @@ -69,7 +70,8 @@ class DbUpgrade(skytools.DBScript): continue # new enough? - if check_version(curs, schema, ver, recheck_func): + ok, oldver = check_version(curs, schema, ver, recheck_func) + if ok: continue # too old schema, no way to upgrade @@ -80,6 +82,7 @@ class DbUpgrade(skytools.DBScript): curs = db.cursor() curs.execute('begin') + self.log.info("%s: Upgrading '%s' version %s to %s", dbname, schema, oldver, ver) skytools.installer_apply_file(db, fn, self.log) curs.execute('commit') |