diff options
| author | Marko Kreen | 2007-03-13 11:52:09 +0000 |
|---|---|---|
| committer | Marko Kreen | 2007-03-13 11:52:09 +0000 |
| commit | 50abdba44a031ad40b1886f941479f203ca92039 (patch) | |
| tree | 873e72d78cd48917b2907c4c63abf185649ebb54 /scripts | |
final public releaseskytools_2_1
Diffstat (limited to 'scripts')
| -rw-r--r-- | scripts/bulk_loader.ini.templ | 13 | ||||
| -rwxr-xr-x | scripts/bulk_loader.py | 181 | ||||
| -rwxr-xr-x | scripts/catsql.py | 141 | ||||
| -rw-r--r-- | scripts/cube_dispatcher.ini.templ | 23 | ||||
| -rwxr-xr-x | scripts/cube_dispatcher.py | 175 | ||||
| -rw-r--r-- | scripts/queue_mover.ini.templ | 14 | ||||
| -rwxr-xr-x | scripts/queue_mover.py | 30 | ||||
| -rw-r--r-- | scripts/queue_splitter.ini.templ | 13 | ||||
| -rwxr-xr-x | scripts/queue_splitter.py | 33 | ||||
| -rw-r--r-- | scripts/scriptmgr.ini.templ | 43 | ||||
| -rwxr-xr-x | scripts/scriptmgr.py | 220 | ||||
| -rw-r--r-- | scripts/table_dispatcher.ini.templ | 31 | ||||
| -rwxr-xr-x | scripts/table_dispatcher.py | 124 |
13 files changed, 1041 insertions, 0 deletions
diff --git a/scripts/bulk_loader.ini.templ b/scripts/bulk_loader.ini.templ new file mode 100644 index 00000000..187c1be2 --- /dev/null +++ b/scripts/bulk_loader.ini.templ @@ -0,0 +1,13 @@ +[bulk_loader] +job_name = bizgres_loader + +src_db = dbname=bulksrc +dst_db = dbname=bulkdst + +pgq_queue_name = xx + +use_skylog = 1 + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py new file mode 100755 index 00000000..a098787e --- /dev/null +++ b/scripts/bulk_loader.py @@ -0,0 +1,181 @@ +#! /usr/bin/env python + +"""Bulkloader for slow databases (Bizgres). + +Idea is following: + - Script reads from queue a batch of urlencoded row changes. + Inserts/updates/deletes, maybe many per one row. + - It changes them to minimal amount of DELETE commands + followed by big COPY of new data. + - One side-effect is that total order of how rows appear + changes, but per-row changes will be kept in order. + +The speedup from the COPY will happen only if the batches are +large enough. So the ticks should happen only after couple +of minutes. + +""" + +import sys, os, pgq, skytools + +def mk_delete_sql(tbl, key_list, data): + """ generate delete command """ + whe_list = [] + for k in key_list: + whe_list.append("%s = %s" % (k, skytools.quote_literal(data[k]))) + whe_str = " and ".join(whe_list) + return "delete from %s where %s;" % (tbl, whe_str) + +class TableCache(object): + """Per-table data hander.""" + + def __init__(self, tbl): + """Init per-batch table data cache.""" + self.name = tbl + self.ev_list = [] + self.pkey_map = {} + self.pkey_list = [] + self.pkey_str = None + self.col_list = None + + def add_event(self, ev): + """Store new event.""" + + # op & data + ev.op = ev.type[0] + ev.row = skytools.db_urldecode(ev.data) + + # get pkey column names + if self.pkey_str is None: + self.pkey_str = ev.type.split(':')[1] + if self.pkey_str: + self.pkey_list = self.pkey_str.split(',') + + # get pkey value + if self.pkey_str: + pk_data = [] + for k in self.pkey_list: + pk_data.append(ev.row[k]) + ev.pk_data = tuple(pk_data) + elif ev.op == 'I': + # fake pkey, just to get them spread out + ev.pk_data = ev.id + else: + raise Exception('non-pk tables not supported: %s' % ev.extra1) + + # get full column list, detect added columns + if not self.col_list: + self.col_list = ev.row.keys() + elif self.col_list != ev.row.keys(): + # ^ supposedly python guarantees same order in keys() + + # find new columns + for c in ev.row.keys(): + if c not in self.col_list: + for oldev in self.ev_list: + oldev.row[c] = None + self.col_list = ev.row.keys() + + # add to list + self.ev_list.append(ev) + + # keep all versions of row data + if ev.pk_data in self.pkey_map: + self.pkey_map[ev.pk_data].append(ev) + else: + self.pkey_map[ev.pk_data] = [ev] + + def finish(self): + """Got all data, prepare for insertion.""" + + del_list = [] + copy_list = [] + for ev_list in self.pkey_map.values(): + # rewrite list of I/U/D events to + # optional DELETE and optional INSERT/COPY command + exists_before = -1 + exists_after = 1 + for ev in ev_list: + if ev.op == "I": + if exists_before < 0: + exists_before = 0 + exists_after = 1 + elif ev.op == "U": + if exists_before < 0: + exists_before = 1 + #exists_after = 1 # this shouldnt be needed + elif ev.op == "D": + if exists_before < 0: + exists_before = 1 + exists_after = 0 + else: + raise Exception('unknown event type: %s' % ev.op) + + # skip short-lived rows + if exists_before == 0 and exists_after == 0: + continue + + # take last event + ev = ev_list[-1] + + # generate needed commands + if exists_before: + del_list.append(mk_delete_sql(self.name, self.pkey_list, ev.row)) + if exists_after: + copy_list.append(ev.row) + + # reorder cols + new_list = self.pkey_list[:] + for k in self.col_list: + if k not in self.pkey_list: + new_list.append(k) + + return del_list, new_list, copy_list + +class BulkLoader(pgq.SerialConsumer): + def __init__(self, args): + pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args) + + def process_remote_batch(self, batch_id, ev_list, dst_db): + """Content dispatcher.""" + + # add events to per-table caches + tables = {} + for ev in ev_list: + tbl = ev.extra1 + + if not tbl in tables: + tables[tbl] = TableCache(tbl) + cache = tables[tbl] + cache.add_event(ev) + ev.tag_done() + + # then process them + for tbl, cache in tables.items(): + self.process_one_table(dst_db, tbl, cache) + + def process_one_table(self, dst_db, tbl, cache): + self.log.debug("process_one_table: %s" % tbl) + del_list, col_list, copy_list = cache.finish() + curs = dst_db.cursor() + + if not skytools.exists_table(curs, tbl): + self.log.warning("Ignoring events for table: %s" % tbl) + return + + if len(del_list) > 0: + self.log.info("Deleting %d rows from %s" % (len(del_list), tbl)) + + q = " ".join(del_list) + self.log.debug(q) + curs.execute(q) + + if len(copy_list) > 0: + self.log.info("Copying %d rows into %s" % (len(copy_list), tbl)) + self.log.debug("COPY %s (%s)" % (tbl, ','.join(col_list))) + skytools.magic_insert(curs, tbl, copy_list, col_list) + +if __name__ == '__main__': + script = BulkLoader(sys.argv[1:]) + script.start() + diff --git a/scripts/catsql.py b/scripts/catsql.py new file mode 100755 index 00000000..94fbacd8 --- /dev/null +++ b/scripts/catsql.py @@ -0,0 +1,141 @@ +#! /usr/bin/env python + +"""Prints out SQL files with psql command execution. + +Supported psql commands: \i, \cd, \q +Others are skipped. + +Aditionally does some pre-processing for NDoc. +NDoc is looks nice but needs some hand-holding. + +Bug: + +- function def end detection searches for 'as'/'is' but does not check + word boundaries - finds them even in function name. That means in + main conf, as/is must be disabled and $ ' added. This script can + remove the unnecessary AS from output. + +Niceties: + +- Ndoc includes function def in output only if def is after comment. + But for SQL functions its better to have it after def. + This script can swap comment and def. + +- Optionally remove CREATE FUNCTION (OR REPLACE) from def to + keep it shorter in doc. + +Note: + +- NDoc compares real function name and name in comment. if differ, + it decides detection failed. + +""" + +import sys, os, re, getopt + +def usage(x): + print "usage: catsql [--ndoc] FILE [FILE ...]" + sys.exit(x) + +# NDoc specific changes +cf_ndoc = 0 + +# compile regexes +func_re = r"create\s+(or\s+replace\s+)?function\s+" +func_rc = re.compile(func_re, re.I) +comm_rc = re.compile(r"^\s*([#]\s*)?(?P<com>--.*)", re.I) +end_rc = re.compile(r"\b([;]|begin|declare|end)\b", re.I) +as_rc = re.compile(r"\s+as\s+", re.I) +cmd_rc = re.compile(r"^\\([a-z]*)(\s+.*)?", re.I) + +# conversion func +def fix_func(ln): + # if ndoc, replace AS with ' ' + if cf_ndoc: + return as_rc.sub(' ', ln) + else: + return ln + +# got function def +def proc_func(f, ln): + # remove CREATE OR REPLACE + if cf_ndoc: + ln = func_rc.sub('', ln) + + ln = fix_func(ln) + pre_list = [ln] + comm_list = [] + n_comm = 0 + while 1: + ln = f.readline() + if not ln: + break + + com = None + if cf_ndoc: + com = comm_rc.search(ln) + if cf_ndoc and com: + pos = com.start('com') + comm_list.append(ln[pos:]) + elif end_rc.search(ln): + break + elif len(comm_list) > 0: + break + else: + pre_list.append(fix_func(ln)) + + if len(comm_list) > 2: + map(sys.stdout.write, comm_list) + map(sys.stdout.write, pre_list) + else: + map(sys.stdout.write, pre_list) + map(sys.stdout.write, comm_list) + if ln: + sys.stdout.write(fix_func(ln)) + +def cat_file(fn): + sys.stdout.write("\n") + f = open(fn) + while 1: + ln = f.readline() + if not ln: + break + m = cmd_rc.search(ln) + if m: + cmd = m.group(1) + if cmd == "i": # include a file + fn2 = m.group(2).strip() + cat_file(fn2) + elif cmd == "q": # quit + sys.exit(0) + elif cmd == "cd": # chdir + dir = m.group(2).strip() + os.chdir(dir) + else: # skip all others + pass + else: + if func_rc.search(ln): # function header + proc_func(f, ln) + else: # normal sql + sys.stdout.write(ln) + sys.stdout.write("\n") + +def main(): + global cf_ndoc + + try: + opts, args = getopt.gnu_getopt(sys.argv[1:], 'h', ['ndoc']) + except getopt.error, d: + print d + usage(1) + for o, v in opts: + if o == "-h": + usage(0) + elif o == "--ndoc": + cf_ndoc = 1 + for fn in args: + cat_file(fn) + +if __name__ == '__main__': + main() + diff --git a/scripts/cube_dispatcher.ini.templ b/scripts/cube_dispatcher.ini.templ new file mode 100644 index 00000000..dea70697 --- /dev/null +++ b/scripts/cube_dispatcher.ini.templ @@ -0,0 +1,23 @@ +[cube_dispatcher] +job_name = some_queue_to_cube + +src_db = dbname=sourcedb_test +dst_db = dbname=dataminedb_test + +pgq_queue_name = udata.some_queue + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + +# how many rows are kept: keep_latest, keep_all +mode = keep_latest + +# to_char() fmt for table suffix +#dateformat = YYYY_MM_DD +# following disables table suffixes: +#dateformat = + +part_template = + create table _DEST_TABLE (like _PARENT); + alter table only _DEST_TABLE add primary key (_PKEY); + diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py new file mode 100755 index 00000000..d59ac300 --- /dev/null +++ b/scripts/cube_dispatcher.py @@ -0,0 +1,175 @@ +#! /usr/bin/env python + +# it accepts urlencoded rows for multiple tables from queue +# and insert them into actual tables, with partitioning on tick time + +import sys, os, pgq, skytools + +DEF_CREATE = """ +create table _DEST_TABLE (like _PARENT); +alter table only _DEST_TABLE add primary key (_PKEY); +""" + +class CubeDispatcher(pgq.SerialConsumer): + def __init__(self, args): + pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args) + + self.dateformat = self.cf.get('dateformat', 'YYYY_MM_DD') + + self.part_template = self.cf.get('part_template', DEF_CREATE) + + mode = self.cf.get('mode', 'keep_latest') + if mode == 'keep_latest': + self.keep_latest = 1 + elif mode == 'keep_all': + self.keep_latest = 0 + else: + self.log.fatal('wrong mode setting') + sys.exit(1) + + def get_part_date(self, batch_id): + if not self.dateformat: + return None + + # fetch and format batch date + src_db = self.get_database('src_db') + curs = src_db.cursor() + q = 'select to_char(batch_end, %s) from pgq.get_batch_info(%s)' + curs.execute(q, [self.dateformat, batch_id]) + src_db.commit() + return curs.fetchone()[0] + + def process_remote_batch(self, batch_id, ev_list, dst_db): + + # actual processing + date_str = self.get_part_date(batch_id) + self.dispatch(dst_db, ev_list, self.get_part_date(batch_id)) + + # tag as done + for ev in ev_list: + ev.tag_done() + + def dispatch(self, dst_db, ev_list, date_str): + """Actual event processing.""" + + # get tables and sql + tables = {} + sql_list = [] + for ev in ev_list: + if date_str: + tbl = "%s_%s" % (ev.extra1, date_str) + else: + tbl = ev.extra1 + + if not tbl in tables: + tables[tbl] = self.get_table_info(ev, tbl) + + sql = self.make_sql(tbl, ev) + sql_list.append(sql) + + # create tables if needed + self.check_tables(dst_db, tables) + + # insert into data tables + curs = dst_db.cursor() + block = [] + for sql in sql_list: + self.log.debug(sql) + block.append(sql) + if len(block) > 100: + curs.execute("\n".join(block)) + block = [] + if len(block) > 0: + curs.execute("\n".join(block)) + + def get_table_info(self, ev, tbl): + inf = { + 'parent': ev.extra1, + 'table': tbl, + 'key_list': ev.type.split(':')[1] + } + return inf + + def make_sql(self, tbl, ev): + """Return SQL statement(s) for that event.""" + + # parse data + data = skytools.db_urldecode(ev.data) + + # parse tbl info + op, keys = ev.type.split(':') + key_list = keys.split(',') + if self.keep_latest and len(key_list) == 0: + raise Exception('No pkey on table %s' % tbl) + + # generate sql + if op in ('I', 'U'): + if self.keep_latest: + sql = "%s %s" % (self.mk_delete_sql(tbl, key_list, data), + self.mk_insert_sql(tbl, key_list, data)) + else: + sql = self.mk_insert_sql(tbl, key_list, data) + elif op == "D": + if not self.keep_latest: + raise Exception('Delete op not supported if mode=keep_all') + + sql = self.mk_delete_sql(tbl, key_list, data) + else: + raise Exception('Unknown row op: %s' % op) + return sql + + def mk_delete_sql(self, tbl, key_list, data): + # generate delete command + whe_list = [] + for k in key_list: + whe_list.append("%s = %s" % (k, skytools.quote_literal(data[k]))) + whe_str = " and ".join(whe_list) + return "delete from %s where %s;" % (tbl, whe_str) + + def mk_insert_sql(self, tbl, key_list, data): + # generate insert command + col_list = [] + val_list = [] + for c, v in data.items(): + col_list.append(c) + val_list.append(skytools.quote_literal(v)) + col_str = ",".join(col_list) + val_str = ",".join(val_list) + return "insert into %s (%s) values (%s);" % ( + tbl, col_str, val_str) + + def check_tables(self, dcon, tables): + """Checks that tables needed for copy are there. If not + then creates them. + + Used by other procedures to ensure that table is there + before they start inserting. + + The commits should not be dangerous, as we haven't done anything + with cdr's yet, so they should still be in one TX. + + Although it would be nicer to have a lock for table creation. + """ + + dcur = dcon.cursor() + exist_map = {} + for tbl, inf in tables.items(): + if skytools.exists_table(dcur, tbl): + continue + + sql = self.part_template + sql = sql.replace('_DEST_TABLE', inf['table']) + sql = sql.replace('_PARENT', inf['parent']) + sql = sql.replace('_PKEY', inf['key_list']) + # be similar to table_dispatcher + schema_table = inf['table'].replace(".", "__") + sql = sql.replace('_SCHEMA_TABLE', schema_table) + + dcur.execute(sql) + dcon.commit() + self.log.info('%s: Created table %s' % (self.job_name, tbl)) + +if __name__ == '__main__': + script = CubeDispatcher(sys.argv[1:]) + script.start() + diff --git a/scripts/queue_mover.ini.templ b/scripts/queue_mover.ini.templ new file mode 100644 index 00000000..8d1ff5f1 --- /dev/null +++ b/scripts/queue_mover.ini.templ @@ -0,0 +1,14 @@ +[queue_mover] +job_name = queue_mover_test + +src_db = dbname=sourcedb_test +dst_db = dbname=dataminedb_test + +pgq_queue_name = source_queue +dst_queue_name = dest_queue + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + +use_skylog = 0 + diff --git a/scripts/queue_mover.py b/scripts/queue_mover.py new file mode 100755 index 00000000..129728a3 --- /dev/null +++ b/scripts/queue_mover.py @@ -0,0 +1,30 @@ +#! /usr/bin/env python + +# this script simply mover events from one queue to another + +import sys, os, pgq, skytools + +class QueueMover(pgq.SerialConsumer): + def __init__(self, args): + pgq.SerialConsumer.__init__(self, "queue_mover", "src_db", "dst_db", args) + + self.dst_queue_name = self.cf.get("dst_queue_name") + + def process_remote_batch(self, db, batch_id, ev_list, dst_db): + + # load data + rows = [] + for ev in ev_list: + data = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time] + rows.append(data) + ev.tag_done() + fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time'] + + # insert data + curs = dst_db.cursor() + pgq.bulk_insert_events(curs, rows, fields, self.dst_queue_name) + +if __name__ == '__main__': + script = QueueMover(sys.argv[1:]) + script.start() + diff --git a/scripts/queue_splitter.ini.templ b/scripts/queue_splitter.ini.templ new file mode 100644 index 00000000..68c5ccbb --- /dev/null +++ b/scripts/queue_splitter.ini.templ @@ -0,0 +1,13 @@ +[queue_splitter] +job_name = queue_splitter_test + +src_db = dbname=sourcedb_test +dst_db = dbname=destdb_test + +pgq_queue_name = source_queue + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + +use_skylog = 0 + diff --git a/scripts/queue_splitter.py b/scripts/queue_splitter.py new file mode 100755 index 00000000..c6714ca0 --- /dev/null +++ b/scripts/queue_splitter.py @@ -0,0 +1,33 @@ +#! /usr/bin/env python + +# puts events into queue specified by field from 'queue_field' config parameter + +import sys, os, pgq, skytools + +class QueueSplitter(pgq.SerialConsumer): + def __init__(self, args): + pgq.SerialConsumer.__init__(self, "queue_splitter", "src_db", "dst_db", args) + + def process_remote_batch(self, db, batch_id, ev_list, dst_db): + cache = {} + queue_field = self.cf.get('queue_field', 'extra1') + for ev in ev_list: + row = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time] + queue = ev.__getattr__(queue_field) + if queue not in cache: + cache[queue] = [] + cache[queue].append(row) + ev.tag_done() + + # should match the composed row + fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time'] + + # now send them to right queues + curs = dst_db.cursor() + for queue, rows in cache.items(): + pgq.bulk_insert_events(curs, rows, fields, queue) + +if __name__ == '__main__': + script = QueueSplitter(sys.argv[1:]) + script.start() + diff --git a/scripts/scriptmgr.ini.templ b/scripts/scriptmgr.ini.templ new file mode 100644 index 00000000..7fa1419d --- /dev/null +++ b/scripts/scriptmgr.ini.templ @@ -0,0 +1,43 @@ + +[scriptmgr] +job_name = scriptmgr_cphdb5 +config_list = ~/dbscripts/conf/*.ini, ~/random/conf/*.ini +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid +#use_skylog = 1 + +# +# defaults for services +# +[DEFAULT] +cwd = ~/dbscripts +args = -v + +# +# service descriptions +# + +[cube_dispatcher] +script = cube_dispatcher.py + +[table_dispatcher] +script = table_dispatcher.py + +[bulk_loader] +script = bulk_loader.py + +[londiste] +script = londiste.py +args = replay + +[pgqadm] +script = pgqadm.py +args = ticker + +# +# services to be ignored +# + +[log_checker] +disabled = 1 + diff --git a/scripts/scriptmgr.py b/scripts/scriptmgr.py new file mode 100755 index 00000000..2ee742b2 --- /dev/null +++ b/scripts/scriptmgr.py @@ -0,0 +1,220 @@ +#! /usr/bin/env python + +"""Bulk start/stop of scripts. + +Reads a bunch of config files and maps them to scripts, then handles those. +""" + +import sys, os, skytools, signal, glob, ConfigParser, time + +command_usage = """ +%prog [options] INI CMD [subcmd args] + +commands: + start [-a | jobname ..] start a job + stop [-a | jobname ..] stop a job + restart [-a | jobname ..] restart job(s) + reload [-a | jobname ..] send reload signal + status +""" + +def job_sort_cmp(j1, j2): + d1 = j1['service'] + j1['job_name'] + d2 = j2['service'] + j2['job_name'] + if d1 < d2: return -1 + elif d1 > d2: return 1 + else: return 0 + +class ScriptMgr(skytools.DBScript): + def init_optparse(self, p = None): + p = skytools.DBScript.init_optparse(self, p) + p.add_option("-a", "--all", action="store_true", help="apply command to all jobs") + p.set_usage(command_usage.strip()) + return p + + def load_jobs(self): + self.svc_list = [] + self.svc_map = {} + self.config_list = [] + + # load services + svc_list = self.cf.sections() + svc_list.remove(self.service_name) + for svc_name in svc_list: + cf = self.cf.clone(svc_name) + disabled = cf.getboolean('disabled', 0) + defscript = None + if disabled: + defscript = '/disabled' + svc = { + 'service': svc_name, + 'script': cf.getfile('script', defscript), + 'cwd': cf.getfile('cwd'), + 'disabled': cf.getboolean('disabled', 0), + 'args': cf.get('args', ''), + } + self.svc_list.append(svc) + self.svc_map[svc_name] = svc + + # generate config list + for tmp in self.cf.getlist('config_list'): + tmp = os.path.expanduser(tmp) + tmp = os.path.expandvars(tmp) + for fn in glob.glob(tmp): + self.config_list.append(fn) + + # read jobs + for fn in self.config_list: + raw = ConfigParser.SafeConfigParser({'job_name':'?', 'service_name':'?'}) + raw.read(fn) + + # skip its own config + if raw.has_section(self.service_name): + continue + + got = 0 + for sect in raw.sections(): + if sect in self.svc_map: + got = 1 + self.add_job(fn, sect) + if not got: + self.log.warning('Cannot find service for %s' % fn) + + def add_job(self, cf_file, service_name): + svc = self.svc_map[service_name] + cf = skytools.Config(service_name, cf_file) + disabled = svc['disabled'] + if not disabled: + disabled = cf.getboolean('disabled', 0) + job = { + 'disabled': disabled, + 'config': cf_file, + 'cwd': svc['cwd'], + 'script': svc['script'], + 'args': svc['args'], + 'service': svc['service'], + 'job_name': cf.get('job_name'), + 'pidfile': cf.getfile('pidfile'), + } + self.job_list.append(job) + self.job_map[job['job_name']] = job + + def cmd_status(self): + for job in self.job_list: + os.chdir(job['cwd']) + cf = skytools.Config(job['service'], job['config']) + pidfile = cf.getfile('pidfile') + name = job['job_name'] + svc = job['service'] + if job['disabled']: + name += " (disabled)" + + if os.path.isfile(pidfile): + print " OK [%s] %s" % (svc, name) + else: + print " STOPPED [%s] %s" % (svc, name) + + def cmd_info(self): + for job in self.job_list: + print job + + def cmd_start(self, job_name): + job = self.job_map[job_name] + if job['disabled']: + self.log.info("Skipping %s" % job_name) + return 0 + self.log.info('Starting %s' % job_name) + os.chdir(job['cwd']) + pidfile = job['pidfile'] + if os.path.isfile(pidfile): + self.log.warning("Script %s seems running") + return 0 + cmd = "%(script)s %(config)s %(args)s -d" % job + res = os.system(cmd) + self.log.debug(res) + if res != 0: + self.log.error('startup failed: %s' % job_name) + return 1 + else: + return 0 + + def cmd_stop(self, job_name): + job = self.job_map[job_name] + if job['disabled']: + self.log.info("Skipping %s" % job_name) + return + self.log.info('Stopping %s' % job_name) + self.signal_job(job, signal.SIGINT) + + def cmd_reload(self, job_name): + job = self.job_map[job_name] + if job['disabled']: + self.log.info("Skipping %s" % job_name) + return + self.log.info('Reloading %s' % job_name) + self.signal_job(job, signal.SIGHUP) + + def signal_job(self, job, sig): + os.chdir(job['cwd']) + pidfile = job['pidfile'] + if os.path.isfile(pidfile): + pid = int(open(pidfile).read()) + os.kill(pid, sig) + else: + self.log.warning("Job %s not running" % job['job_name']) + + def work(self): + self.set_single_loop(1) + self.job_list = [] + self.job_map = {} + self.load_jobs() + + if len(self.args) < 2: + print "need command" + sys.exit(1) + + jobs = self.args[2:] + if len(jobs) == 0 and self.options.all: + for job in self.job_list: + jobs.append(job['job_name']) + + self.job_list.sort(job_sort_cmp) + + cmd = self.args[1] + if cmd == "status": + self.cmd_status() + return + elif cmd == "info": + self.cmd_info() + return + + if len(jobs) == 0: + print "no jobs given?" + sys.exit(1) + + if cmd == "start": + err = 0 + for n in jobs: + err += self.cmd_start(n) + if err > 0: + self.log.error('some scripts failed') + sys.exit(1) + elif cmd == "stop": + for n in jobs: + self.cmd_stop(n) + elif cmd == "restart": + for n in jobs: + self.cmd_stop(n) + time.sleep(2) + self.cmd_start(n) + elif cmd == "reload": + for n in self.jobs: + self.cmd_reload(n) + else: + print "unknown command:", cmd + sys.exit(1) + +if __name__ == '__main__': + script = ScriptMgr('scriptmgr', sys.argv[1:]) + script.start() + diff --git a/scripts/table_dispatcher.ini.templ b/scripts/table_dispatcher.ini.templ new file mode 100644 index 00000000..131dd7fe --- /dev/null +++ b/scripts/table_dispatcher.ini.templ @@ -0,0 +1,31 @@ +[udata_dispatcher] +job_name = test_move + +src_db = dbname=sourcedb_test +dst_db = dbname=dataminedb_test + +pgq_queue_name = OrderLog + +logfile = ~/log/%(job_name)s.log +pidfile = ~/pid/%(job_name)s.pid + +# where to put data. when partitioning, will be used as base name +dest_table = orders + +# date field with will be used for partitioning +# special value: _EVTIME - event creation time +part_column = start_date + +#fields = * +#fields = id, name +#fields = id:newid, name, bar:baz + + +# template used for creating partition tables +# _DEST_TABLE +part_template = + create table _DEST_TABLE () inherits (orders); + alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id); + grant select on _DEST_TABLE to group reporting; + + diff --git a/scripts/table_dispatcher.py b/scripts/table_dispatcher.py new file mode 100755 index 00000000..054ced9a --- /dev/null +++ b/scripts/table_dispatcher.py @@ -0,0 +1,124 @@ +#! /usr/bin/env python + +# it loads urlencoded rows for one trable from queue and inserts +# them into actual tables, with optional partitioning + +import sys, os, pgq, skytools + +DEST_TABLE = "_DEST_TABLE" +SCHEMA_TABLE = "_SCHEMA_TABLE" + +class TableDispatcher(pgq.SerialConsumer): + def __init__(self, args): + pgq.SerialConsumer.__init__(self, "table_dispatcher", "src_db", "dst_db", args) + + self.part_template = self.cf.get("part_template", '') + self.dest_table = self.cf.get("dest_table") + self.part_field = self.cf.get("part_field", '') + self.part_method = self.cf.get("part_method", 'daily') + if self.part_method not in ('daily', 'monthly'): + raise Exception('bad part_method') + + if self.cf.get("fields", "*") == "*": + self.field_map = None + else: + self.field_map = {} + for fval in self.cf.getlist('fields'): + tmp = fval.split(':') + if len(tmp) == 1: + self.field_map[tmp[0]] = tmp[0] + else: + self.field_map[tmp[0]] = tmp[1] + + def process_remote_batch(self, batch_id, ev_list, dst_db): + if len(ev_list) == 0: + return + + # actual processing + self.dispatch(dst_db, ev_list) + + # tag as done + for ev in ev_list: + ev.tag_done() + + def dispatch(self, dst_db, ev_list): + """Generic dispatcher.""" + + # load data + tables = {} + for ev in ev_list: + row = skytools.db_urldecode(ev.data) + + # guess dest table + if self.part_field: + if self.part_field == "_EVTIME": + partval = str(ev.creation_date) + else: + partval = str(row[self.part_field]) + partval = partval.split(' ')[0] + date = partval.split('-') + if self.part_method == 'monthly': + date = date[:2] + suffix = '_'.join(date) + tbl = "%s_%s" % (self.dest_table, suffix) + else: + tbl = self.dest_table + + # map fields + if self.field_map is None: + dstrow = row + else: + dstrow = {} + for k, v in self.field_map.items(): + dstrow[v] = row[k] + + # add row into table + if not tbl in tables: + tables[tbl] = [dstrow] + else: + tables[tbl].append(dstrow) + + # create tables if needed + self.check_tables(dst_db, tables) + + # insert into data tables + curs = dst_db.cursor() + for tbl, tbl_rows in tables.items(): + skytools.magic_insert(curs, tbl, tbl_rows) + + def check_tables(self, dcon, tables): + """Checks that tables needed for copy are there. If not + then creates them. + + Used by other procedures to ensure that table is there + before they start inserting. + + The commits should not be dangerous, as we haven't done anything + with cdr's yet, so they should still be in one TX. + + Although it would be nicer to have a lock for table creation. + """ + + dcur = dcon.cursor() + exist_map = {} + for tbl in tables.keys(): + if not skytools.exists_table(dcur, tbl): + if not self.part_template: + raise Exception('Dest table does not exists and no way to create it.') + + sql = self.part_template + sql = sql.replace(DEST_TABLE, tbl) + + # we do this to make sure that constraints for + # tables who contain a schema will still work + schema_table = tbl.replace(".", "__") + sql = sql.replace(SCHEMA_TABLE, schema_table) + + dcur.execute(sql) + dcon.commit() + self.log.info('%s: Created table %s' % (self.job_name, tbl)) + +if __name__ == '__main__': + script = TableDispatcher(sys.argv[1:]) + script.start() + |
