summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authormartinko2012-07-24 08:50:10 +0000
committermartinko2012-07-24 08:50:10 +0000
commit5b54a246cf31cbe2a88154711a113ade73019859 (patch)
tree197d221afb089af2aa5fa0f93942f2ec40a324ca /scripts
parent60ea682083e30c62c3298b37a9024c41a6ecc3f6 (diff)
parentfecd2ff72e00acb558024a402f75c5956dfbe5c1 (diff)
Merge branch 'master' of skype-git:/git/dba/skytools-3
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/find_sql_functions.py103
-rwxr-xr-xscripts/simple_consumer.py36
-rwxr-xr-xscripts/simple_local_consumer.py66
-rwxr-xr-xscripts/skytools_upgrade.py11
4 files changed, 202 insertions, 14 deletions
diff --git a/scripts/find_sql_functions.py b/scripts/find_sql_functions.py
new file mode 100755
index 00000000..55718cd8
--- /dev/null
+++ b/scripts/find_sql_functions.py
@@ -0,0 +1,103 @@
+#! /usr/bin/env python
+
+"""Find and print out function signatures from .sql file.
+
+Usage:
+ find_sql_functions.py [-h] [-s] [-p PREFIX] FILE ...
+
+Switches:
+ -h Show help
+ -p PREFIX Prefix each line with string
+ -s Check whether function is SECURITY DEFINER
+"""
+
+import sys, re, getopt
+
+rx = r"""
+^
+create \s+ (?: or \s+ replace \s+ )?
+function ( [^(]+ )
+[(] ( [^)]* ) [)]
+"""
+
+rx_secdef = r"""security\s+definer"""
+
+
+rc = re.compile(rx, re.I | re.M | re.X)
+sc = re.compile(r"\s+")
+rc_sec = re.compile(rx_secdef, re.I | re.X)
+
+def grep_file(fn, cf_prefix, cf_secdef):
+ sql = open(fn).read()
+ pos = 0
+ while 1:
+ m = rc.search(sql, pos)
+ if not m:
+ break
+ pos = m.end()
+
+ m2 = rc.search(sql, pos)
+ if m2:
+ xpos = m2.end()
+ else:
+ xpos = len(sql)
+ secdef = False
+ m2 = rc_sec.search(sql, pos, xpos)
+ if m2:
+ secdef = True
+
+ fname = m.group(1).strip()
+ fargs = m.group(2)
+
+ alist = fargs.split(',')
+ tlist = []
+ for a in alist:
+ a = a.strip()
+ toks = sc.split(a.lower())
+ if toks[0] == "out":
+ continue
+ if toks[0] in ("in", "inout"):
+ toks = toks[1:]
+ # just take last item
+ tlist.append(toks[-1])
+
+ sig = "%s(%s)" % (fname, ", ".join(tlist))
+
+ if cf_prefix:
+ ln = "%s %s;" % (cf_prefix, sig)
+ else:
+ ln = " %s(%s)," % (fname, ", ".join(tlist))
+
+ if cf_secdef and secdef:
+ ln = "%-72s -- SECDEF" % (ln)
+
+ print ln
+
+def main(argv):
+ cf_secdef = 0
+ cf_prefix = ''
+
+ try:
+ opts, args = getopt.getopt(argv, "hsp:")
+ except getopt.error, d:
+ print 'getopt:', d
+ sys.exit(1)
+
+ for o, a in opts:
+ if o == '-h':
+ print __doc__
+ sys.exit(0)
+ elif o == '-s':
+ cf_secdef = 1
+ elif o == '-p':
+ cf_prefix = a
+ else:
+ print __doc__
+ sys.exit(1)
+
+ for fn in args:
+ grep_file(fn, cf_prefix, cf_secdef)
+
+if __name__ == '__main__':
+ main(sys.argv[1:])
+
diff --git a/scripts/simple_consumer.py b/scripts/simple_consumer.py
index 109f27a5..df0db11c 100755
--- a/scripts/simple_consumer.py
+++ b/scripts/simple_consumer.py
@@ -3,9 +3,16 @@
"""Consumer that simply calls SQL query for each event.
Config::
+ # source database
+ src_db =
+
+ # destination database
+ dst_db =
+
# query to call
dst_query = select * from somefunc(%%(pgq.ev_data)s);
+ ## Deprecated, use table_filter ##
# filter for events (SQL fragment)
consumer_filter = ev_extra1 = 'public.mytable1'
"""
@@ -22,10 +29,11 @@ import skytools
class SimpleConsumer(pgq.Consumer):
__doc__ = __doc__
- def __init__(self, args):
- pgq.Consumer.__init__(self,"simple_consumer3", "src_db", args)
+ def reload(self):
+ super(SimpleConsumer, self).reload()
self.dst_query = self.cf.get("dst_query")
- self.consumer_filter = self.cf.get("consumer_filter", "")
+ if self.cf.get("consumer_filter", ""):
+ self.consumer_filter = self.cf.get("consumer_filter", "")
def process_event(self, db, ev):
curs = self.get_database('dst_db', autocommit = 1).cursor()
@@ -37,17 +45,25 @@ class SimpleConsumer(pgq.Consumer):
payload = {}
else:
payload = skytools.db_urldecode(ev.ev_data)
- payload['pgq.ev_data'] = ev.ev_data
+ payload['pgq.tick_id'] = self.batch_info['cur_tick_id']
+ payload['pgq.ev_id'] = ev.ev_id
+ payload['pgq.ev_time'] = ev.ev_time
payload['pgq.ev_type'] = ev.ev_type
+ payload['pgq.ev_data'] = ev.ev_data
payload['pgq.ev_extra1'] = ev.ev_extra1
- payload['pgq.ev_time'] = ev.ev_time
-
- self.log.debug(self.dst_query % payload)
+ payload['pgq.ev_extra2'] = ev.ev_extra2
+ payload['pgq.ev_extra3'] = ev.ev_extra3
+ payload['pgq.ev_extra4'] = ev.ev_extra4
+
+ self.log.debug(self.dst_query, payload)
curs.execute(self.dst_query, payload)
- res = curs.fetchall()
- self.log.debug(res)
+ if curs.statusmessage[:6] == 'SELECT':
+ res = curs.fetchall()
+ self.log.debug(res)
+ else:
+ self.log.debug(curs.statusmessage)
if __name__ == '__main__':
- script = SimpleConsumer(sys.argv[1:])
+ script = SimpleConsumer("simple_consumer3", "src_db", sys.argv[1:])
script.start()
diff --git a/scripts/simple_local_consumer.py b/scripts/simple_local_consumer.py
new file mode 100755
index 00000000..6e3eb601
--- /dev/null
+++ b/scripts/simple_local_consumer.py
@@ -0,0 +1,66 @@
+#!/usr/bin/python
+
+"""Consumer that simply calls SQL query for each event.
+
+It tracks completed batches in local file.
+
+Config::
+ # source database
+ src_db =
+
+ # destination database
+ dst_db =
+
+ # query to call
+ dst_query = select * from somefunc(%%(pgq.ev_data)s);
+"""
+
+
+import sys
+
+import pkgloader
+pkgloader.require('skytools', '3.0')
+
+import pgq
+import skytools
+
+class SimpleLocalConsumer(pgq.LocalConsumer):
+ __doc__ = __doc__
+
+ def reload(self):
+ super(SimpleLocalConsumer, self).reload()
+ self.dst_query = self.cf.get("dst_query")
+
+ def process_local_event(self, db, batch_id, ev):
+ curs = self.get_database('dst_db', autocommit = 1).cursor()
+
+ if ev.ev_type[:2] not in ('I:', 'U:', 'D:'):
+ return
+
+ if ev.ev_data is None:
+ payload = {}
+ else:
+ payload = skytools.db_urldecode(ev.ev_data)
+
+ payload['pgq.tick_id'] = self.batch_info['cur_tick_id']
+ payload['pgq.ev_id'] = ev.ev_id
+ payload['pgq.ev_time'] = ev.ev_time
+ payload['pgq.ev_type'] = ev.ev_type
+ payload['pgq.ev_data'] = ev.ev_data
+ payload['pgq.ev_extra1'] = ev.ev_extra1
+ payload['pgq.ev_extra2'] = ev.ev_extra2
+ payload['pgq.ev_extra3'] = ev.ev_extra3
+ payload['pgq.ev_extra4'] = ev.ev_extra4
+
+ self.log.debug(self.dst_query, payload)
+ curs.execute(self.dst_query, payload)
+ if curs.statusmessage[:6] == 'SELECT':
+ res = curs.fetchall()
+ self.log.debug(res)
+ else:
+ self.log.debug(curs.statusmessage)
+
+if __name__ == '__main__':
+ script = SimpleLocalConsumer("simple_local_consumer3", "src_db", sys.argv[1:])
+ script.start()
+
diff --git a/scripts/skytools_upgrade.py b/scripts/skytools_upgrade.py
index 169a1913..5a1cefe4 100755
--- a/scripts/skytools_upgrade.py
+++ b/scripts/skytools_upgrade.py
@@ -43,13 +43,14 @@ def check_version(curs, schema, new_ver_str, recheck_func=None):
funcname = "%s.version" % schema
if not skytools.exists_function(curs, funcname, 0):
if recheck_func is not None:
- return recheck_func(curs)
+ return recheck_func(curs), 'NULL'
else:
- return 0
+ return 0, 'NULL'
q = "select %s()" % funcname
curs.execute(q)
old_ver_str = curs.fetchone()[0]
- return is_version_ge(old_ver_str, new_ver_str)
+ ok = is_version_ge(old_ver_str, new_ver_str)
+ return ok, old_ver_str
class DbUpgrade(skytools.DBScript):
@@ -69,7 +70,8 @@ class DbUpgrade(skytools.DBScript):
continue
# new enough?
- if check_version(curs, schema, ver, recheck_func):
+ ok, oldver = check_version(curs, schema, ver, recheck_func)
+ if ok:
continue
# too old schema, no way to upgrade
@@ -80,6 +82,7 @@ class DbUpgrade(skytools.DBScript):
curs = db.cursor()
curs.execute('begin')
+ self.log.info("%s: Upgrading '%s' version %s to %s", dbname, schema, oldver, ver)
skytools.installer_apply_file(db, fn, self.log)
curs.execute('commit')