summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorMarko Kreen2007-03-13 11:52:09 +0000
committerMarko Kreen2007-03-13 11:52:09 +0000
commit50abdba44a031ad40b1886f941479f203ca92039 (patch)
tree873e72d78cd48917b2907c4c63abf185649ebb54 /scripts
final public releaseskytools_2_1
Diffstat (limited to 'scripts')
-rw-r--r--scripts/bulk_loader.ini.templ13
-rwxr-xr-xscripts/bulk_loader.py181
-rwxr-xr-xscripts/catsql.py141
-rw-r--r--scripts/cube_dispatcher.ini.templ23
-rwxr-xr-xscripts/cube_dispatcher.py175
-rw-r--r--scripts/queue_mover.ini.templ14
-rwxr-xr-xscripts/queue_mover.py30
-rw-r--r--scripts/queue_splitter.ini.templ13
-rwxr-xr-xscripts/queue_splitter.py33
-rw-r--r--scripts/scriptmgr.ini.templ43
-rwxr-xr-xscripts/scriptmgr.py220
-rw-r--r--scripts/table_dispatcher.ini.templ31
-rwxr-xr-xscripts/table_dispatcher.py124
13 files changed, 1041 insertions, 0 deletions
diff --git a/scripts/bulk_loader.ini.templ b/scripts/bulk_loader.ini.templ
new file mode 100644
index 00000000..187c1be2
--- /dev/null
+++ b/scripts/bulk_loader.ini.templ
@@ -0,0 +1,13 @@
+[bulk_loader]
+job_name = bizgres_loader
+
+src_db = dbname=bulksrc
+dst_db = dbname=bulkdst
+
+pgq_queue_name = xx
+
+use_skylog = 1
+
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+
diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py
new file mode 100755
index 00000000..a098787e
--- /dev/null
+++ b/scripts/bulk_loader.py
@@ -0,0 +1,181 @@
+#! /usr/bin/env python
+
+"""Bulkloader for slow databases (Bizgres).
+
+Idea is following:
+ - Script reads from queue a batch of urlencoded row changes.
+ Inserts/updates/deletes, maybe many per one row.
+ - It changes them to minimal amount of DELETE commands
+ followed by big COPY of new data.
+ - One side-effect is that total order of how rows appear
+ changes, but per-row changes will be kept in order.
+
+The speedup from the COPY will happen only if the batches are
+large enough. So the ticks should happen only after couple
+of minutes.
+
+"""
+
+import sys, os, pgq, skytools
+
+def mk_delete_sql(tbl, key_list, data):
+ """ generate delete command """
+ whe_list = []
+ for k in key_list:
+ whe_list.append("%s = %s" % (k, skytools.quote_literal(data[k])))
+ whe_str = " and ".join(whe_list)
+ return "delete from %s where %s;" % (tbl, whe_str)
+
+class TableCache(object):
+ """Per-table data hander."""
+
+ def __init__(self, tbl):
+ """Init per-batch table data cache."""
+ self.name = tbl
+ self.ev_list = []
+ self.pkey_map = {}
+ self.pkey_list = []
+ self.pkey_str = None
+ self.col_list = None
+
+ def add_event(self, ev):
+ """Store new event."""
+
+ # op & data
+ ev.op = ev.type[0]
+ ev.row = skytools.db_urldecode(ev.data)
+
+ # get pkey column names
+ if self.pkey_str is None:
+ self.pkey_str = ev.type.split(':')[1]
+ if self.pkey_str:
+ self.pkey_list = self.pkey_str.split(',')
+
+ # get pkey value
+ if self.pkey_str:
+ pk_data = []
+ for k in self.pkey_list:
+ pk_data.append(ev.row[k])
+ ev.pk_data = tuple(pk_data)
+ elif ev.op == 'I':
+ # fake pkey, just to get them spread out
+ ev.pk_data = ev.id
+ else:
+ raise Exception('non-pk tables not supported: %s' % ev.extra1)
+
+ # get full column list, detect added columns
+ if not self.col_list:
+ self.col_list = ev.row.keys()
+ elif self.col_list != ev.row.keys():
+ # ^ supposedly python guarantees same order in keys()
+
+ # find new columns
+ for c in ev.row.keys():
+ if c not in self.col_list:
+ for oldev in self.ev_list:
+ oldev.row[c] = None
+ self.col_list = ev.row.keys()
+
+ # add to list
+ self.ev_list.append(ev)
+
+ # keep all versions of row data
+ if ev.pk_data in self.pkey_map:
+ self.pkey_map[ev.pk_data].append(ev)
+ else:
+ self.pkey_map[ev.pk_data] = [ev]
+
+ def finish(self):
+ """Got all data, prepare for insertion."""
+
+ del_list = []
+ copy_list = []
+ for ev_list in self.pkey_map.values():
+ # rewrite list of I/U/D events to
+ # optional DELETE and optional INSERT/COPY command
+ exists_before = -1
+ exists_after = 1
+ for ev in ev_list:
+ if ev.op == "I":
+ if exists_before < 0:
+ exists_before = 0
+ exists_after = 1
+ elif ev.op == "U":
+ if exists_before < 0:
+ exists_before = 1
+ #exists_after = 1 # this shouldnt be needed
+ elif ev.op == "D":
+ if exists_before < 0:
+ exists_before = 1
+ exists_after = 0
+ else:
+ raise Exception('unknown event type: %s' % ev.op)
+
+ # skip short-lived rows
+ if exists_before == 0 and exists_after == 0:
+ continue
+
+ # take last event
+ ev = ev_list[-1]
+
+ # generate needed commands
+ if exists_before:
+ del_list.append(mk_delete_sql(self.name, self.pkey_list, ev.row))
+ if exists_after:
+ copy_list.append(ev.row)
+
+ # reorder cols
+ new_list = self.pkey_list[:]
+ for k in self.col_list:
+ if k not in self.pkey_list:
+ new_list.append(k)
+
+ return del_list, new_list, copy_list
+
+class BulkLoader(pgq.SerialConsumer):
+ def __init__(self, args):
+ pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args)
+
+ def process_remote_batch(self, batch_id, ev_list, dst_db):
+ """Content dispatcher."""
+
+ # add events to per-table caches
+ tables = {}
+ for ev in ev_list:
+ tbl = ev.extra1
+
+ if not tbl in tables:
+ tables[tbl] = TableCache(tbl)
+ cache = tables[tbl]
+ cache.add_event(ev)
+ ev.tag_done()
+
+ # then process them
+ for tbl, cache in tables.items():
+ self.process_one_table(dst_db, tbl, cache)
+
+ def process_one_table(self, dst_db, tbl, cache):
+ self.log.debug("process_one_table: %s" % tbl)
+ del_list, col_list, copy_list = cache.finish()
+ curs = dst_db.cursor()
+
+ if not skytools.exists_table(curs, tbl):
+ self.log.warning("Ignoring events for table: %s" % tbl)
+ return
+
+ if len(del_list) > 0:
+ self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
+
+ q = " ".join(del_list)
+ self.log.debug(q)
+ curs.execute(q)
+
+ if len(copy_list) > 0:
+ self.log.info("Copying %d rows into %s" % (len(copy_list), tbl))
+ self.log.debug("COPY %s (%s)" % (tbl, ','.join(col_list)))
+ skytools.magic_insert(curs, tbl, copy_list, col_list)
+
+if __name__ == '__main__':
+ script = BulkLoader(sys.argv[1:])
+ script.start()
+
diff --git a/scripts/catsql.py b/scripts/catsql.py
new file mode 100755
index 00000000..94fbacd8
--- /dev/null
+++ b/scripts/catsql.py
@@ -0,0 +1,141 @@
+#! /usr/bin/env python
+
+"""Prints out SQL files with psql command execution.
+
+Supported psql commands: \i, \cd, \q
+Others are skipped.
+
+Aditionally does some pre-processing for NDoc.
+NDoc is looks nice but needs some hand-holding.
+
+Bug:
+
+- function def end detection searches for 'as'/'is' but does not check
+ word boundaries - finds them even in function name. That means in
+ main conf, as/is must be disabled and $ ' added. This script can
+ remove the unnecessary AS from output.
+
+Niceties:
+
+- Ndoc includes function def in output only if def is after comment.
+ But for SQL functions its better to have it after def.
+ This script can swap comment and def.
+
+- Optionally remove CREATE FUNCTION (OR REPLACE) from def to
+ keep it shorter in doc.
+
+Note:
+
+- NDoc compares real function name and name in comment. if differ,
+ it decides detection failed.
+
+"""
+
+import sys, os, re, getopt
+
+def usage(x):
+ print "usage: catsql [--ndoc] FILE [FILE ...]"
+ sys.exit(x)
+
+# NDoc specific changes
+cf_ndoc = 0
+
+# compile regexes
+func_re = r"create\s+(or\s+replace\s+)?function\s+"
+func_rc = re.compile(func_re, re.I)
+comm_rc = re.compile(r"^\s*([#]\s*)?(?P<com>--.*)", re.I)
+end_rc = re.compile(r"\b([;]|begin|declare|end)\b", re.I)
+as_rc = re.compile(r"\s+as\s+", re.I)
+cmd_rc = re.compile(r"^\\([a-z]*)(\s+.*)?", re.I)
+
+# conversion func
+def fix_func(ln):
+ # if ndoc, replace AS with ' '
+ if cf_ndoc:
+ return as_rc.sub(' ', ln)
+ else:
+ return ln
+
+# got function def
+def proc_func(f, ln):
+ # remove CREATE OR REPLACE
+ if cf_ndoc:
+ ln = func_rc.sub('', ln)
+
+ ln = fix_func(ln)
+ pre_list = [ln]
+ comm_list = []
+ n_comm = 0
+ while 1:
+ ln = f.readline()
+ if not ln:
+ break
+
+ com = None
+ if cf_ndoc:
+ com = comm_rc.search(ln)
+ if cf_ndoc and com:
+ pos = com.start('com')
+ comm_list.append(ln[pos:])
+ elif end_rc.search(ln):
+ break
+ elif len(comm_list) > 0:
+ break
+ else:
+ pre_list.append(fix_func(ln))
+
+ if len(comm_list) > 2:
+ map(sys.stdout.write, comm_list)
+ map(sys.stdout.write, pre_list)
+ else:
+ map(sys.stdout.write, pre_list)
+ map(sys.stdout.write, comm_list)
+ if ln:
+ sys.stdout.write(fix_func(ln))
+
+def cat_file(fn):
+ sys.stdout.write("\n")
+ f = open(fn)
+ while 1:
+ ln = f.readline()
+ if not ln:
+ break
+ m = cmd_rc.search(ln)
+ if m:
+ cmd = m.group(1)
+ if cmd == "i": # include a file
+ fn2 = m.group(2).strip()
+ cat_file(fn2)
+ elif cmd == "q": # quit
+ sys.exit(0)
+ elif cmd == "cd": # chdir
+ dir = m.group(2).strip()
+ os.chdir(dir)
+ else: # skip all others
+ pass
+ else:
+ if func_rc.search(ln): # function header
+ proc_func(f, ln)
+ else: # normal sql
+ sys.stdout.write(ln)
+ sys.stdout.write("\n")
+
+def main():
+ global cf_ndoc
+
+ try:
+ opts, args = getopt.gnu_getopt(sys.argv[1:], 'h', ['ndoc'])
+ except getopt.error, d:
+ print d
+ usage(1)
+ for o, v in opts:
+ if o == "-h":
+ usage(0)
+ elif o == "--ndoc":
+ cf_ndoc = 1
+ for fn in args:
+ cat_file(fn)
+
+if __name__ == '__main__':
+ main()
+
diff --git a/scripts/cube_dispatcher.ini.templ b/scripts/cube_dispatcher.ini.templ
new file mode 100644
index 00000000..dea70697
--- /dev/null
+++ b/scripts/cube_dispatcher.ini.templ
@@ -0,0 +1,23 @@
+[cube_dispatcher]
+job_name = some_queue_to_cube
+
+src_db = dbname=sourcedb_test
+dst_db = dbname=dataminedb_test
+
+pgq_queue_name = udata.some_queue
+
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+
+# how many rows are kept: keep_latest, keep_all
+mode = keep_latest
+
+# to_char() fmt for table suffix
+#dateformat = YYYY_MM_DD
+# following disables table suffixes:
+#dateformat =
+
+part_template =
+ create table _DEST_TABLE (like _PARENT);
+ alter table only _DEST_TABLE add primary key (_PKEY);
+
diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py
new file mode 100755
index 00000000..d59ac300
--- /dev/null
+++ b/scripts/cube_dispatcher.py
@@ -0,0 +1,175 @@
+#! /usr/bin/env python
+
+# it accepts urlencoded rows for multiple tables from queue
+# and insert them into actual tables, with partitioning on tick time
+
+import sys, os, pgq, skytools
+
+DEF_CREATE = """
+create table _DEST_TABLE (like _PARENT);
+alter table only _DEST_TABLE add primary key (_PKEY);
+"""
+
+class CubeDispatcher(pgq.SerialConsumer):
+ def __init__(self, args):
+ pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args)
+
+ self.dateformat = self.cf.get('dateformat', 'YYYY_MM_DD')
+
+ self.part_template = self.cf.get('part_template', DEF_CREATE)
+
+ mode = self.cf.get('mode', 'keep_latest')
+ if mode == 'keep_latest':
+ self.keep_latest = 1
+ elif mode == 'keep_all':
+ self.keep_latest = 0
+ else:
+ self.log.fatal('wrong mode setting')
+ sys.exit(1)
+
+ def get_part_date(self, batch_id):
+ if not self.dateformat:
+ return None
+
+ # fetch and format batch date
+ src_db = self.get_database('src_db')
+ curs = src_db.cursor()
+ q = 'select to_char(batch_end, %s) from pgq.get_batch_info(%s)'
+ curs.execute(q, [self.dateformat, batch_id])
+ src_db.commit()
+ return curs.fetchone()[0]
+
+ def process_remote_batch(self, batch_id, ev_list, dst_db):
+
+ # actual processing
+ date_str = self.get_part_date(batch_id)
+ self.dispatch(dst_db, ev_list, self.get_part_date(batch_id))
+
+ # tag as done
+ for ev in ev_list:
+ ev.tag_done()
+
+ def dispatch(self, dst_db, ev_list, date_str):
+ """Actual event processing."""
+
+ # get tables and sql
+ tables = {}
+ sql_list = []
+ for ev in ev_list:
+ if date_str:
+ tbl = "%s_%s" % (ev.extra1, date_str)
+ else:
+ tbl = ev.extra1
+
+ if not tbl in tables:
+ tables[tbl] = self.get_table_info(ev, tbl)
+
+ sql = self.make_sql(tbl, ev)
+ sql_list.append(sql)
+
+ # create tables if needed
+ self.check_tables(dst_db, tables)
+
+ # insert into data tables
+ curs = dst_db.cursor()
+ block = []
+ for sql in sql_list:
+ self.log.debug(sql)
+ block.append(sql)
+ if len(block) > 100:
+ curs.execute("\n".join(block))
+ block = []
+ if len(block) > 0:
+ curs.execute("\n".join(block))
+
+ def get_table_info(self, ev, tbl):
+ inf = {
+ 'parent': ev.extra1,
+ 'table': tbl,
+ 'key_list': ev.type.split(':')[1]
+ }
+ return inf
+
+ def make_sql(self, tbl, ev):
+ """Return SQL statement(s) for that event."""
+
+ # parse data
+ data = skytools.db_urldecode(ev.data)
+
+ # parse tbl info
+ op, keys = ev.type.split(':')
+ key_list = keys.split(',')
+ if self.keep_latest and len(key_list) == 0:
+ raise Exception('No pkey on table %s' % tbl)
+
+ # generate sql
+ if op in ('I', 'U'):
+ if self.keep_latest:
+ sql = "%s %s" % (self.mk_delete_sql(tbl, key_list, data),
+ self.mk_insert_sql(tbl, key_list, data))
+ else:
+ sql = self.mk_insert_sql(tbl, key_list, data)
+ elif op == "D":
+ if not self.keep_latest:
+ raise Exception('Delete op not supported if mode=keep_all')
+
+ sql = self.mk_delete_sql(tbl, key_list, data)
+ else:
+ raise Exception('Unknown row op: %s' % op)
+ return sql
+
+ def mk_delete_sql(self, tbl, key_list, data):
+ # generate delete command
+ whe_list = []
+ for k in key_list:
+ whe_list.append("%s = %s" % (k, skytools.quote_literal(data[k])))
+ whe_str = " and ".join(whe_list)
+ return "delete from %s where %s;" % (tbl, whe_str)
+
+ def mk_insert_sql(self, tbl, key_list, data):
+ # generate insert command
+ col_list = []
+ val_list = []
+ for c, v in data.items():
+ col_list.append(c)
+ val_list.append(skytools.quote_literal(v))
+ col_str = ",".join(col_list)
+ val_str = ",".join(val_list)
+ return "insert into %s (%s) values (%s);" % (
+ tbl, col_str, val_str)
+
+ def check_tables(self, dcon, tables):
+ """Checks that tables needed for copy are there. If not
+ then creates them.
+
+ Used by other procedures to ensure that table is there
+ before they start inserting.
+
+ The commits should not be dangerous, as we haven't done anything
+ with cdr's yet, so they should still be in one TX.
+
+ Although it would be nicer to have a lock for table creation.
+ """
+
+ dcur = dcon.cursor()
+ exist_map = {}
+ for tbl, inf in tables.items():
+ if skytools.exists_table(dcur, tbl):
+ continue
+
+ sql = self.part_template
+ sql = sql.replace('_DEST_TABLE', inf['table'])
+ sql = sql.replace('_PARENT', inf['parent'])
+ sql = sql.replace('_PKEY', inf['key_list'])
+ # be similar to table_dispatcher
+ schema_table = inf['table'].replace(".", "__")
+ sql = sql.replace('_SCHEMA_TABLE', schema_table)
+
+ dcur.execute(sql)
+ dcon.commit()
+ self.log.info('%s: Created table %s' % (self.job_name, tbl))
+
+if __name__ == '__main__':
+ script = CubeDispatcher(sys.argv[1:])
+ script.start()
+
diff --git a/scripts/queue_mover.ini.templ b/scripts/queue_mover.ini.templ
new file mode 100644
index 00000000..8d1ff5f1
--- /dev/null
+++ b/scripts/queue_mover.ini.templ
@@ -0,0 +1,14 @@
+[queue_mover]
+job_name = queue_mover_test
+
+src_db = dbname=sourcedb_test
+dst_db = dbname=dataminedb_test
+
+pgq_queue_name = source_queue
+dst_queue_name = dest_queue
+
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+
+use_skylog = 0
+
diff --git a/scripts/queue_mover.py b/scripts/queue_mover.py
new file mode 100755
index 00000000..129728a3
--- /dev/null
+++ b/scripts/queue_mover.py
@@ -0,0 +1,30 @@
+#! /usr/bin/env python
+
+# this script simply mover events from one queue to another
+
+import sys, os, pgq, skytools
+
+class QueueMover(pgq.SerialConsumer):
+ def __init__(self, args):
+ pgq.SerialConsumer.__init__(self, "queue_mover", "src_db", "dst_db", args)
+
+ self.dst_queue_name = self.cf.get("dst_queue_name")
+
+ def process_remote_batch(self, db, batch_id, ev_list, dst_db):
+
+ # load data
+ rows = []
+ for ev in ev_list:
+ data = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time]
+ rows.append(data)
+ ev.tag_done()
+ fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time']
+
+ # insert data
+ curs = dst_db.cursor()
+ pgq.bulk_insert_events(curs, rows, fields, self.dst_queue_name)
+
+if __name__ == '__main__':
+ script = QueueMover(sys.argv[1:])
+ script.start()
+
diff --git a/scripts/queue_splitter.ini.templ b/scripts/queue_splitter.ini.templ
new file mode 100644
index 00000000..68c5ccbb
--- /dev/null
+++ b/scripts/queue_splitter.ini.templ
@@ -0,0 +1,13 @@
+[queue_splitter]
+job_name = queue_splitter_test
+
+src_db = dbname=sourcedb_test
+dst_db = dbname=destdb_test
+
+pgq_queue_name = source_queue
+
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+
+use_skylog = 0
+
diff --git a/scripts/queue_splitter.py b/scripts/queue_splitter.py
new file mode 100755
index 00000000..c6714ca0
--- /dev/null
+++ b/scripts/queue_splitter.py
@@ -0,0 +1,33 @@
+#! /usr/bin/env python
+
+# puts events into queue specified by field from 'queue_field' config parameter
+
+import sys, os, pgq, skytools
+
+class QueueSplitter(pgq.SerialConsumer):
+ def __init__(self, args):
+ pgq.SerialConsumer.__init__(self, "queue_splitter", "src_db", "dst_db", args)
+
+ def process_remote_batch(self, db, batch_id, ev_list, dst_db):
+ cache = {}
+ queue_field = self.cf.get('queue_field', 'extra1')
+ for ev in ev_list:
+ row = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time]
+ queue = ev.__getattr__(queue_field)
+ if queue not in cache:
+ cache[queue] = []
+ cache[queue].append(row)
+ ev.tag_done()
+
+ # should match the composed row
+ fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time']
+
+ # now send them to right queues
+ curs = dst_db.cursor()
+ for queue, rows in cache.items():
+ pgq.bulk_insert_events(curs, rows, fields, queue)
+
+if __name__ == '__main__':
+ script = QueueSplitter(sys.argv[1:])
+ script.start()
+
diff --git a/scripts/scriptmgr.ini.templ b/scripts/scriptmgr.ini.templ
new file mode 100644
index 00000000..7fa1419d
--- /dev/null
+++ b/scripts/scriptmgr.ini.templ
@@ -0,0 +1,43 @@
+
+[scriptmgr]
+job_name = scriptmgr_cphdb5
+config_list = ~/dbscripts/conf/*.ini, ~/random/conf/*.ini
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+#use_skylog = 1
+
+#
+# defaults for services
+#
+[DEFAULT]
+cwd = ~/dbscripts
+args = -v
+
+#
+# service descriptions
+#
+
+[cube_dispatcher]
+script = cube_dispatcher.py
+
+[table_dispatcher]
+script = table_dispatcher.py
+
+[bulk_loader]
+script = bulk_loader.py
+
+[londiste]
+script = londiste.py
+args = replay
+
+[pgqadm]
+script = pgqadm.py
+args = ticker
+
+#
+# services to be ignored
+#
+
+[log_checker]
+disabled = 1
+
diff --git a/scripts/scriptmgr.py b/scripts/scriptmgr.py
new file mode 100755
index 00000000..2ee742b2
--- /dev/null
+++ b/scripts/scriptmgr.py
@@ -0,0 +1,220 @@
+#! /usr/bin/env python
+
+"""Bulk start/stop of scripts.
+
+Reads a bunch of config files and maps them to scripts, then handles those.
+"""
+
+import sys, os, skytools, signal, glob, ConfigParser, time
+
+command_usage = """
+%prog [options] INI CMD [subcmd args]
+
+commands:
+ start [-a | jobname ..] start a job
+ stop [-a | jobname ..] stop a job
+ restart [-a | jobname ..] restart job(s)
+ reload [-a | jobname ..] send reload signal
+ status
+"""
+
+def job_sort_cmp(j1, j2):
+ d1 = j1['service'] + j1['job_name']
+ d2 = j2['service'] + j2['job_name']
+ if d1 < d2: return -1
+ elif d1 > d2: return 1
+ else: return 0
+
+class ScriptMgr(skytools.DBScript):
+ def init_optparse(self, p = None):
+ p = skytools.DBScript.init_optparse(self, p)
+ p.add_option("-a", "--all", action="store_true", help="apply command to all jobs")
+ p.set_usage(command_usage.strip())
+ return p
+
+ def load_jobs(self):
+ self.svc_list = []
+ self.svc_map = {}
+ self.config_list = []
+
+ # load services
+ svc_list = self.cf.sections()
+ svc_list.remove(self.service_name)
+ for svc_name in svc_list:
+ cf = self.cf.clone(svc_name)
+ disabled = cf.getboolean('disabled', 0)
+ defscript = None
+ if disabled:
+ defscript = '/disabled'
+ svc = {
+ 'service': svc_name,
+ 'script': cf.getfile('script', defscript),
+ 'cwd': cf.getfile('cwd'),
+ 'disabled': cf.getboolean('disabled', 0),
+ 'args': cf.get('args', ''),
+ }
+ self.svc_list.append(svc)
+ self.svc_map[svc_name] = svc
+
+ # generate config list
+ for tmp in self.cf.getlist('config_list'):
+ tmp = os.path.expanduser(tmp)
+ tmp = os.path.expandvars(tmp)
+ for fn in glob.glob(tmp):
+ self.config_list.append(fn)
+
+ # read jobs
+ for fn in self.config_list:
+ raw = ConfigParser.SafeConfigParser({'job_name':'?', 'service_name':'?'})
+ raw.read(fn)
+
+ # skip its own config
+ if raw.has_section(self.service_name):
+ continue
+
+ got = 0
+ for sect in raw.sections():
+ if sect in self.svc_map:
+ got = 1
+ self.add_job(fn, sect)
+ if not got:
+ self.log.warning('Cannot find service for %s' % fn)
+
+ def add_job(self, cf_file, service_name):
+ svc = self.svc_map[service_name]
+ cf = skytools.Config(service_name, cf_file)
+ disabled = svc['disabled']
+ if not disabled:
+ disabled = cf.getboolean('disabled', 0)
+ job = {
+ 'disabled': disabled,
+ 'config': cf_file,
+ 'cwd': svc['cwd'],
+ 'script': svc['script'],
+ 'args': svc['args'],
+ 'service': svc['service'],
+ 'job_name': cf.get('job_name'),
+ 'pidfile': cf.getfile('pidfile'),
+ }
+ self.job_list.append(job)
+ self.job_map[job['job_name']] = job
+
+ def cmd_status(self):
+ for job in self.job_list:
+ os.chdir(job['cwd'])
+ cf = skytools.Config(job['service'], job['config'])
+ pidfile = cf.getfile('pidfile')
+ name = job['job_name']
+ svc = job['service']
+ if job['disabled']:
+ name += " (disabled)"
+
+ if os.path.isfile(pidfile):
+ print " OK [%s] %s" % (svc, name)
+ else:
+ print " STOPPED [%s] %s" % (svc, name)
+
+ def cmd_info(self):
+ for job in self.job_list:
+ print job
+
+ def cmd_start(self, job_name):
+ job = self.job_map[job_name]
+ if job['disabled']:
+ self.log.info("Skipping %s" % job_name)
+ return 0
+ self.log.info('Starting %s' % job_name)
+ os.chdir(job['cwd'])
+ pidfile = job['pidfile']
+ if os.path.isfile(pidfile):
+ self.log.warning("Script %s seems running")
+ return 0
+ cmd = "%(script)s %(config)s %(args)s -d" % job
+ res = os.system(cmd)
+ self.log.debug(res)
+ if res != 0:
+ self.log.error('startup failed: %s' % job_name)
+ return 1
+ else:
+ return 0
+
+ def cmd_stop(self, job_name):
+ job = self.job_map[job_name]
+ if job['disabled']:
+ self.log.info("Skipping %s" % job_name)
+ return
+ self.log.info('Stopping %s' % job_name)
+ self.signal_job(job, signal.SIGINT)
+
+ def cmd_reload(self, job_name):
+ job = self.job_map[job_name]
+ if job['disabled']:
+ self.log.info("Skipping %s" % job_name)
+ return
+ self.log.info('Reloading %s' % job_name)
+ self.signal_job(job, signal.SIGHUP)
+
+ def signal_job(self, job, sig):
+ os.chdir(job['cwd'])
+ pidfile = job['pidfile']
+ if os.path.isfile(pidfile):
+ pid = int(open(pidfile).read())
+ os.kill(pid, sig)
+ else:
+ self.log.warning("Job %s not running" % job['job_name'])
+
+ def work(self):
+ self.set_single_loop(1)
+ self.job_list = []
+ self.job_map = {}
+ self.load_jobs()
+
+ if len(self.args) < 2:
+ print "need command"
+ sys.exit(1)
+
+ jobs = self.args[2:]
+ if len(jobs) == 0 and self.options.all:
+ for job in self.job_list:
+ jobs.append(job['job_name'])
+
+ self.job_list.sort(job_sort_cmp)
+
+ cmd = self.args[1]
+ if cmd == "status":
+ self.cmd_status()
+ return
+ elif cmd == "info":
+ self.cmd_info()
+ return
+
+ if len(jobs) == 0:
+ print "no jobs given?"
+ sys.exit(1)
+
+ if cmd == "start":
+ err = 0
+ for n in jobs:
+ err += self.cmd_start(n)
+ if err > 0:
+ self.log.error('some scripts failed')
+ sys.exit(1)
+ elif cmd == "stop":
+ for n in jobs:
+ self.cmd_stop(n)
+ elif cmd == "restart":
+ for n in jobs:
+ self.cmd_stop(n)
+ time.sleep(2)
+ self.cmd_start(n)
+ elif cmd == "reload":
+ for n in self.jobs:
+ self.cmd_reload(n)
+ else:
+ print "unknown command:", cmd
+ sys.exit(1)
+
+if __name__ == '__main__':
+ script = ScriptMgr('scriptmgr', sys.argv[1:])
+ script.start()
+
diff --git a/scripts/table_dispatcher.ini.templ b/scripts/table_dispatcher.ini.templ
new file mode 100644
index 00000000..131dd7fe
--- /dev/null
+++ b/scripts/table_dispatcher.ini.templ
@@ -0,0 +1,31 @@
+[udata_dispatcher]
+job_name = test_move
+
+src_db = dbname=sourcedb_test
+dst_db = dbname=dataminedb_test
+
+pgq_queue_name = OrderLog
+
+logfile = ~/log/%(job_name)s.log
+pidfile = ~/pid/%(job_name)s.pid
+
+# where to put data. when partitioning, will be used as base name
+dest_table = orders
+
+# date field with will be used for partitioning
+# special value: _EVTIME - event creation time
+part_column = start_date
+
+#fields = *
+#fields = id, name
+#fields = id:newid, name, bar:baz
+
+
+# template used for creating partition tables
+# _DEST_TABLE
+part_template =
+ create table _DEST_TABLE () inherits (orders);
+ alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id);
+ grant select on _DEST_TABLE to group reporting;
+
+
diff --git a/scripts/table_dispatcher.py b/scripts/table_dispatcher.py
new file mode 100755
index 00000000..054ced9a
--- /dev/null
+++ b/scripts/table_dispatcher.py
@@ -0,0 +1,124 @@
+#! /usr/bin/env python
+
+# it loads urlencoded rows for one trable from queue and inserts
+# them into actual tables, with optional partitioning
+
+import sys, os, pgq, skytools
+
+DEST_TABLE = "_DEST_TABLE"
+SCHEMA_TABLE = "_SCHEMA_TABLE"
+
+class TableDispatcher(pgq.SerialConsumer):
+ def __init__(self, args):
+ pgq.SerialConsumer.__init__(self, "table_dispatcher", "src_db", "dst_db", args)
+
+ self.part_template = self.cf.get("part_template", '')
+ self.dest_table = self.cf.get("dest_table")
+ self.part_field = self.cf.get("part_field", '')
+ self.part_method = self.cf.get("part_method", 'daily')
+ if self.part_method not in ('daily', 'monthly'):
+ raise Exception('bad part_method')
+
+ if self.cf.get("fields", "*") == "*":
+ self.field_map = None
+ else:
+ self.field_map = {}
+ for fval in self.cf.getlist('fields'):
+ tmp = fval.split(':')
+ if len(tmp) == 1:
+ self.field_map[tmp[0]] = tmp[0]
+ else:
+ self.field_map[tmp[0]] = tmp[1]
+
+ def process_remote_batch(self, batch_id, ev_list, dst_db):
+ if len(ev_list) == 0:
+ return
+
+ # actual processing
+ self.dispatch(dst_db, ev_list)
+
+ # tag as done
+ for ev in ev_list:
+ ev.tag_done()
+
+ def dispatch(self, dst_db, ev_list):
+ """Generic dispatcher."""
+
+ # load data
+ tables = {}
+ for ev in ev_list:
+ row = skytools.db_urldecode(ev.data)
+
+ # guess dest table
+ if self.part_field:
+ if self.part_field == "_EVTIME":
+ partval = str(ev.creation_date)
+ else:
+ partval = str(row[self.part_field])
+ partval = partval.split(' ')[0]
+ date = partval.split('-')
+ if self.part_method == 'monthly':
+ date = date[:2]
+ suffix = '_'.join(date)
+ tbl = "%s_%s" % (self.dest_table, suffix)
+ else:
+ tbl = self.dest_table
+
+ # map fields
+ if self.field_map is None:
+ dstrow = row
+ else:
+ dstrow = {}
+ for k, v in self.field_map.items():
+ dstrow[v] = row[k]
+
+ # add row into table
+ if not tbl in tables:
+ tables[tbl] = [dstrow]
+ else:
+ tables[tbl].append(dstrow)
+
+ # create tables if needed
+ self.check_tables(dst_db, tables)
+
+ # insert into data tables
+ curs = dst_db.cursor()
+ for tbl, tbl_rows in tables.items():
+ skytools.magic_insert(curs, tbl, tbl_rows)
+
+ def check_tables(self, dcon, tables):
+ """Checks that tables needed for copy are there. If not
+ then creates them.
+
+ Used by other procedures to ensure that table is there
+ before they start inserting.
+
+ The commits should not be dangerous, as we haven't done anything
+ with cdr's yet, so they should still be in one TX.
+
+ Although it would be nicer to have a lock for table creation.
+ """
+
+ dcur = dcon.cursor()
+ exist_map = {}
+ for tbl in tables.keys():
+ if not skytools.exists_table(dcur, tbl):
+ if not self.part_template:
+ raise Exception('Dest table does not exists and no way to create it.')
+
+ sql = self.part_template
+ sql = sql.replace(DEST_TABLE, tbl)
+
+ # we do this to make sure that constraints for
+ # tables who contain a schema will still work
+ schema_table = tbl.replace(".", "__")
+ sql = sql.replace(SCHEMA_TABLE, schema_table)
+
+ dcur.execute(sql)
+ dcon.commit()
+ self.log.info('%s: Created table %s' % (self.job_name, tbl))
+
+if __name__ == '__main__':
+ script = TableDispatcher(sys.argv[1:])
+ script.start()
+