diff options
| author | Marko Kreen | 2009-06-05 10:28:31 +0000 |
|---|---|---|
| committer | Marko Kreen | 2009-06-07 15:03:25 +0000 |
| commit | e9f032912997443878fdd70634ec644d73fef1c8 (patch) | |
| tree | 14c68cafef16551381229486fc757b9044016de4 /scripts | |
| parent | c8b2d68fcd66a94ec748ece7d280d76e5238e752 (diff) | |
move bulk_loader/cube_dispatcher/table_dispatcher to old/
The scripts are obsolete but kept as samples. To notify
that they are not maintained actively, move them away.
Diffstat (limited to 'scripts')
| -rwxr-xr-x | scripts/bulk_loader.py | 423 | ||||
| -rwxr-xr-x | scripts/cube_dispatcher.py | 203 | ||||
| -rwxr-xr-x | scripts/simple_serial_consumer.py | 93 | ||||
| -rwxr-xr-x | scripts/table_dispatcher.py | 149 |
4 files changed, 0 insertions, 868 deletions
diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py deleted file mode 100755 index c9a67496..00000000 --- a/scripts/bulk_loader.py +++ /dev/null @@ -1,423 +0,0 @@ -#! /usr/bin/env python - -"""Bulkloader for slow databases (Bizgres). - -Idea is following: - - Script reads from queue a batch of urlencoded row changes. - Inserts/updates/deletes, maybe many per one row. - - It creates 3 lists: ins_list, upd_list, del_list. - If one row is changed several times, it keeps the latest. - - Lists are processed in followin way: - ins_list - COPY into main table - upd_list - COPY into temp table, UPDATE from there - del_list - COPY into temp table, DELETE from there - - One side-effect is that total order of how rows appear - changes, but per-row changes will be kept in order. - -The speedup from the COPY will happen only if the batches are -large enough. So the ticks should happen only after couple -of minutes. - -bl_sourcedb_queue_to_destdb.ini - -Config template:: - [bulk_loader] - # job name is optional when not given ini file name is used - job_name = bl_sourcedb_queue_to_destdb - - src_db = dbname=sourcedb - dst_db = dbname=destdb - - pgq_queue_name = source_queue - - use_skylog = 0 - - logfile = ~/log/%(job_name)s.log - pidfile = ~/pid/%(job_name)s.pid - - # 0 - apply UPDATE as UPDATE - # 1 - apply UPDATE as DELETE+INSERT - # 2 - merge INSERT/UPDATE, do DELETE+INSERT - load_method = 0 - - # no hurry - loop_delay = 10 - - # table renaming - # remap_tables = skypein_cdr_closed:skypein_cdr, tbl1:tbl2 -""" - -import sys, os, pgq, skytools -from skytools import quote_ident, quote_fqident - - -## several methods for applying data - -# update as update -METH_CORRECT = 0 -# update as delete/copy -METH_DELETE = 1 -# merge ins_list and upd_list, do delete/copy -METH_MERGED = 2 - -# no good method for temp table check before 8.2 -USE_LONGLIVED_TEMP_TABLES = False - -AVOID_BIZGRES_BUG = 1 - -def find_dist_fields(curs, fqtbl): - if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"): - return [] - schema, name = fqtbl.split('.') - q = "select a.attname"\ - " from pg_class t, pg_namespace n, pg_attribute a,"\ - " mpp_distribution_policy p"\ - " where n.oid = t.relnamespace"\ - " and p.localoid = t.oid"\ - " and a.attrelid = t.oid"\ - " and a.attnum = any(p.attrnums)"\ - " and n.nspname = %s and t.relname = %s" - curs.execute(q, [schema, name]) - res = [] - for row in curs.fetchall(): - res.append(row[0]) - return res - -def exists_temp_table(curs, tbl): - # correct way, works only on 8.2 - q = "select 1 from pg_class where relname = %s and relnamespace = pg_my_temp_schema()" - - # does not work with parallel case - #q = """ - #select 1 from pg_class t, pg_namespace n - #where n.oid = t.relnamespace - # and pg_table_is_visible(t.oid) - # and has_schema_privilege(n.nspname, 'USAGE') - # and has_table_privilege(n.nspname || '.' || t.relname, 'SELECT') - # and substr(n.nspname, 1, 8) = 'pg_temp_' - # and t.relname = %s; - #""" - curs.execute(q, [tbl]) - tmp = curs.fetchall() - return len(tmp) > 0 - -class TableCache: - """Per-table data hander.""" - - def __init__(self, tbl): - """Init per-batch table data cache.""" - self.name = tbl - self.ev_list = [] - self.pkey_map = {} - self.pkey_list = [] - self.pkey_str = None - self.col_list = None - - self.final_ins_list = [] - self.final_upd_list = [] - self.final_del_list = [] - - def add_event(self, ev): - """Store new event.""" - - # op & data - ev.op = ev.ev_type[0] - ev.data = skytools.db_urldecode(ev.ev_data) - - # get pkey column names - if self.pkey_str is None: - if len(ev.ev_type) > 2: - self.pkey_str = ev.ev_type.split(':')[1] - else: - self.pkey_str = ev.ev_extra2 - - if self.pkey_str: - self.pkey_list = self.pkey_str.split(',') - - # get pkey value - if self.pkey_str: - pk_data = [] - for k in self.pkey_list: - pk_data.append(ev.data[k]) - ev.pk_data = tuple(pk_data) - elif ev.op == 'I': - # fake pkey, just to get them spread out - ev.pk_data = ev.id - else: - raise Exception('non-pk tables not supported: %s' % self.name) - - # get full column list, detect added columns - if not self.col_list: - self.col_list = ev.data.keys() - elif self.col_list != ev.data.keys(): - # ^ supposedly python guarantees same order in keys() - - # find new columns - for c in ev.data.keys(): - if c not in self.col_list: - for oldev in self.ev_list: - oldev.data[c] = None - self.col_list = ev.data.keys() - - # add to list - self.ev_list.append(ev) - - # keep all versions of row data - if ev.pk_data in self.pkey_map: - self.pkey_map[ev.pk_data].append(ev) - else: - self.pkey_map[ev.pk_data] = [ev] - - def finish(self): - """Got all data, prepare for insertion.""" - - del_list = [] - ins_list = [] - upd_list = [] - for ev_list in self.pkey_map.values(): - # rewrite list of I/U/D events to - # optional DELETE and optional INSERT/COPY command - exists_before = -1 - exists_after = 1 - for ev in ev_list: - if ev.op == "I": - if exists_before < 0: - exists_before = 0 - exists_after = 1 - elif ev.op == "U": - if exists_before < 0: - exists_before = 1 - #exists_after = 1 # this shouldnt be needed - elif ev.op == "D": - if exists_before < 0: - exists_before = 1 - exists_after = 0 - else: - raise Exception('unknown event type: %s' % ev.op) - - # skip short-lived rows - if exists_before == 0 and exists_after == 0: - continue - - # take last event - ev = ev_list[-1] - - # generate needed commands - if exists_before and exists_after: - upd_list.append(ev.data) - elif exists_before: - del_list.append(ev.data) - elif exists_after: - ins_list.append(ev.data) - - # reorder cols - new_list = self.pkey_list[:] - for k in self.col_list: - if k not in self.pkey_list: - new_list.append(k) - - self.col_list = new_list - self.final_ins_list = ins_list - self.final_upd_list = upd_list - self.final_del_list = del_list - -class BulkLoader(pgq.SerialConsumer): - __doc__ = __doc__ - load_method = METH_CORRECT - remap_tables = {} - def __init__(self, args): - pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args) - - def reload(self): - pgq.SerialConsumer.reload(self) - - self.load_method = self.cf.getint("load_method", METH_CORRECT) - if self.load_method not in (METH_CORRECT,METH_DELETE,METH_MERGED): - raise Exception("bad load_method") - - self.remap_tables = {} - for mapelem in self.cf.getlist("remap_tables", ''): - tmp = mapelem.split(':') - tbl = tmp[0].strip() - new = tmp[1].strip() - self.remap_tables[tbl] = new - - def process_remote_batch(self, src_db, batch_id, ev_list, dst_db): - """Content dispatcher.""" - - # add events to per-table caches - tables = {} - for ev in ev_list: - tbl = ev.extra1 - if not tbl in tables: - tables[tbl] = TableCache(tbl) - cache = tables[tbl] - cache.add_event(ev) - - # then process them - for tbl, cache in tables.items(): - cache.finish() - self.process_one_table(dst_db, tbl, cache) - - def process_one_table(self, dst_db, tbl, cache): - - del_list = cache.final_del_list - ins_list = cache.final_ins_list - upd_list = cache.final_upd_list - col_list = cache.col_list - real_update_count = len(upd_list) - - self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % ( - tbl, len(ins_list), len(upd_list), len(del_list))) - - if tbl in self.remap_tables: - old = tbl - tbl = self.remap_tables[tbl] - self.log.debug("Redirect %s to %s" % (old, tbl)) - - # hack to unbroke stuff - if self.load_method == METH_MERGED: - upd_list += ins_list - ins_list = [] - - # check if interesting table - curs = dst_db.cursor() - if not skytools.exists_table(curs, tbl): - self.log.warning("Ignoring events for table: %s" % tbl) - return - - # fetch distribution fields - dist_fields = find_dist_fields(curs, tbl) - extra_fields = [] - for fld in dist_fields: - if fld not in cache.pkey_list: - extra_fields.append(fld) - self.log.debug("PKey fields: %s Extra fields: %s" % ( - ",".join(cache.pkey_list), ",".join(extra_fields))) - - # create temp table - temp = self.create_temp_table(curs, tbl) - - # where expr must have pkey and dist fields - klist = [] - for pk in cache.pkey_list + extra_fields: - exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk), - quote_fqident(temp), quote_ident(pk)) - klist.append(exp) - whe_expr = " and ".join(klist) - - # create del sql - del_sql = "delete from only %s using %s where %s" % ( - quote_fqident(tbl), quote_fqident(temp), whe_expr) - - # create update sql - slist = [] - key_fields = cache.pkey_list + extra_fields - for col in cache.col_list: - if col not in key_fields: - exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col)) - slist.append(exp) - upd_sql = "update only %s set %s from %s where %s" % ( - quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr) - - # insert sql - colstr = ",".join([quote_ident(c) for c in cache.col_list]) - ins_sql = "insert into %s (%s) select %s from %s" % ( - quote_fqident(tbl), colstr, colstr, quote_fqident(temp)) - - # process deleted rows - if len(del_list) > 0: - self.log.info("Deleting %d rows from %s" % (len(del_list), tbl)) - # delete old rows - q = "truncate %s" % quote_fqident(temp) - self.log.debug(q) - curs.execute(q) - # copy rows - self.log.debug("COPY %d rows into %s" % (len(del_list), temp)) - skytools.magic_insert(curs, temp, del_list, col_list) - # delete rows - self.log.debug(del_sql) - curs.execute(del_sql) - self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount)) - self.log.debug(curs.statusmessage) - if len(del_list) != curs.rowcount: - self.log.warning("Delete mismatch: expected=%s updated=%d" - % (len(del_list), curs.rowcount)) - - # process updated rows - if len(upd_list) > 0: - self.log.info("Updating %d rows in %s" % (len(upd_list), tbl)) - # delete old rows - q = "truncate %s" % quote_fqident(temp) - self.log.debug(q) - curs.execute(q) - # copy rows - self.log.debug("COPY %d rows into %s" % (len(upd_list), temp)) - skytools.magic_insert(curs, temp, upd_list, col_list) - if self.load_method == METH_CORRECT: - # update main table - self.log.debug(upd_sql) - curs.execute(upd_sql) - self.log.debug(curs.statusmessage) - # check count - if len(upd_list) != curs.rowcount: - self.log.warning("Update mismatch: expected=%s updated=%d" - % (len(upd_list), curs.rowcount)) - else: - # delete from main table - self.log.debug(del_sql) - curs.execute(del_sql) - self.log.debug(curs.statusmessage) - # check count - if real_update_count != curs.rowcount: - self.log.warning("Update mismatch: expected=%s deleted=%d" - % (real_update_count, curs.rowcount)) - # insert into main table - if AVOID_BIZGRES_BUG: - # copy again, into main table - self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl)) - skytools.magic_insert(curs, tbl, upd_list, col_list) - else: - # better way, but does not work due bizgres bug - self.log.debug(ins_sql) - curs.execute(ins_sql) - self.log.debug(curs.statusmessage) - - # process new rows - if len(ins_list) > 0: - self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl)) - skytools.magic_insert(curs, tbl, ins_list, col_list) - - # delete remaining rows - if USE_LONGLIVED_TEMP_TABLES: - q = "truncate %s" % quote_fqident(temp) - else: - # fscking problems with long-lived temp tables - q = "drop table %s" % quote_fqident(temp) - self.log.debug(q) - curs.execute(q) - - def create_temp_table(self, curs, tbl): - # create temp table for loading - tempname = tbl.replace('.', '_') + "_loadertmp" - - # check if exists - if USE_LONGLIVED_TEMP_TABLES: - if exists_temp_table(curs, tempname): - self.log.debug("Using existing temp table %s" % tempname) - return tempname - - # bizgres crashes on delete rows - arg = "on commit delete rows" - arg = "on commit preserve rows" - # create temp table for loading - q = "create temp table %s (like %s) %s" % ( - quote_fqident(tempname), quote_fqident(tbl), arg) - self.log.debug("Creating temp table: %s" % q) - curs.execute(q) - return tempname - -if __name__ == '__main__': - script = BulkLoader(sys.argv[1:]) - script.start() - diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py deleted file mode 100755 index 76a3ab3f..00000000 --- a/scripts/cube_dispatcher.py +++ /dev/null @@ -1,203 +0,0 @@ -#! /usr/bin/env python - -"""It accepts urlencoded rows for multiple tables from queue -and insert them into actual tables, with partitioning on tick time. - -Config template:: - - [cube_dispatcher] - job_name = cd_srcdb_queue_to_dstdb_dstcolo.ini - - src_db = dbname=sourcedb_test - dst_db = dbname=dataminedb_test - - pgq_queue_name = udata.some_queue - - logfile = ~/log/%(job_name)s.log - pidfile = ~/pid/%(job_name)s.pid - - # how many rows are kept: keep_latest, keep_all - mode = keep_latest - - # to_char() fmt for table suffix - #dateformat = YYYY_MM_DD - # following disables table suffixes: - #dateformat = - - part_template = - create table _DEST_TABLE (like _PARENT); - alter table only _DEST_TABLE add primary key (_PKEY); - grant select on _DEST_TABLE to postgres; -""" - -import sys, os, pgq, skytools - -DEF_CREATE = """ -create table _DEST_TABLE (like _PARENT); -alter table only _DEST_TABLE add primary key (_PKEY); -""" - -class CubeDispatcher(pgq.SerialConsumer): - __doc__ = __doc__ - - def __init__(self, args): - pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args) - - self.dateformat = self.cf.get('dateformat', 'YYYY_MM_DD') - - self.part_template = self.cf.get('part_template', DEF_CREATE) - - mode = self.cf.get('mode', 'keep_latest') - if mode == 'keep_latest': - self.keep_latest = 1 - elif mode == 'keep_all': - self.keep_latest = 0 - else: - self.log.fatal('wrong mode setting') - sys.exit(1) - - def get_part_date(self, batch_id): - if not self.dateformat: - return None - - # fetch and format batch date - src_db = self.get_database('src_db') - curs = src_db.cursor() - q = 'select to_char(batch_end, %s) from pgq.get_batch_info(%s)' - curs.execute(q, [self.dateformat, batch_id]) - src_db.commit() - return curs.fetchone()[0] - - def process_remote_batch(self, src_db, batch_id, ev_list, dst_db): - # actual processing - self.dispatch(dst_db, ev_list, self.get_part_date(batch_id)) - - def dispatch(self, dst_db, ev_list, date_str): - """Actual event processing.""" - - # get tables and sql - tables = {} - sql_list = [] - for ev in ev_list: - if date_str: - tbl = "%s_%s" % (ev.extra1, date_str) - else: - tbl = ev.extra1 - - sql = self.make_sql(tbl, ev) - sql_list.append(sql) - - if not tbl in tables: - tables[tbl] = self.get_table_info(ev, tbl) - - # create tables if needed - self.check_tables(dst_db, tables) - - # insert into data tables - curs = dst_db.cursor() - block = [] - for sql in sql_list: - self.log.debug(sql) - block.append(sql) - if len(block) > 100: - curs.execute("\n".join(block)) - block = [] - if len(block) > 0: - curs.execute("\n".join(block)) - - def get_table_info(self, ev, tbl): - klist = [skytools.quote_ident(k) for k in ev.key_list.split(',')] - inf = { - 'parent': ev.extra1, - 'table': tbl, - 'key_list': ",".join(klist), - } - return inf - - def make_sql(self, tbl, ev): - """Return SQL statement(s) for that event.""" - - # parse data - data = skytools.db_urldecode(ev.data) - - # parse tbl info - if ev.type.find(':') > 0: - op, keys = ev.type.split(':') - else: - op = ev.type - keys = ev.extra2 - ev.key_list = keys - key_list = keys.split(',') - if self.keep_latest and len(key_list) == 0: - raise Exception('No pkey on table %s' % tbl) - - # generate sql - if op in ('I', 'U'): - if self.keep_latest: - sql = "%s %s" % (self.mk_delete_sql(tbl, key_list, data), - self.mk_insert_sql(tbl, key_list, data)) - else: - sql = self.mk_insert_sql(tbl, key_list, data) - elif op == "D": - if not self.keep_latest: - raise Exception('Delete op not supported if mode=keep_all') - - sql = self.mk_delete_sql(tbl, key_list, data) - else: - raise Exception('Unknown row op: %s' % op) - return sql - - def mk_delete_sql(self, tbl, key_list, data): - # generate delete command - whe_list = [] - for k in key_list: - whe_list.append("%s = %s" % (skytools.quote_ident(k), skytools.quote_literal(data[k]))) - whe_str = " and ".join(whe_list) - return "delete from %s where %s;" % (skytools.quote_fqident(tbl), whe_str) - - def mk_insert_sql(self, tbl, key_list, data): - # generate insert command - col_list = [] - val_list = [] - for c, v in data.items(): - col_list.append(skytools.quote_ident(c)) - val_list.append(skytools.quote_literal(v)) - col_str = ",".join(col_list) - val_str = ",".join(val_list) - return "insert into %s (%s) values (%s);" % ( - skytools.quote_fqident(tbl), col_str, val_str) - - def check_tables(self, dcon, tables): - """Checks that tables needed for copy are there. If not - then creates them. - - Used by other procedures to ensure that table is there - before they start inserting. - - The commits should not be dangerous, as we haven't done anything - with cdr's yet, so they should still be in one TX. - - Although it would be nicer to have a lock for table creation. - """ - - dcur = dcon.cursor() - for tbl, inf in tables.items(): - if skytools.exists_table(dcur, tbl): - continue - - sql = self.part_template - sql = sql.replace('_DEST_TABLE', skytools.quote_fqident(inf['table'])) - sql = sql.replace('_PARENT', skytools.quote_fqident(inf['parent'])) - sql = sql.replace('_PKEY', inf['key_list']) - # be similar to table_dispatcher - schema_table = inf['table'].replace(".", "__") - sql = sql.replace('_SCHEMA_TABLE', skytools.quote_ident(schema_table)) - - dcur.execute(sql) - dcon.commit() - self.log.info('%s: Created table %s' % (self.job_name, tbl)) - -if __name__ == '__main__': - script = CubeDispatcher(sys.argv[1:]) - script.start() - diff --git a/scripts/simple_serial_consumer.py b/scripts/simple_serial_consumer.py deleted file mode 100755 index 2bd06be7..00000000 --- a/scripts/simple_serial_consumer.py +++ /dev/null @@ -1,93 +0,0 @@ -#! /usr/bin/env python - -"""simple serial consumer for skytools3 - -it consumes events from a predefined queue and feeds them to a sql statement - -Config template:: - -[simple_serial_consumer] -job_name = descriptive_name_for_job - -src_db = dbname=sourcedb_test -dst_db = dbname=destdb port=1234 host=dbhost.com username=guest password=secret - -pgq_queue_name = source_queue - -logfile = ~/log/%(job_name)s.log -pidfile = ~/pid/%(job_name)s.pid - -dst_query = select 1 - -use_skylog = 0 -""" - -"""Config example:: - -Create a queue named "echo_queue" in a database (like "testdb") - -Register consumer "echo" to this queue - -Start the echo consumer with config file shown below -(You may want to use -v to see, what will happen) - -From some other window, insert something into the queue: - select pgq.insert_event('echo_queue','type','hello=world'); - -Enjoy the ride :) - -If dst_query is set to "select 1" then echo consumer becomes a sink consumer - -[simple_serial_consumer] - -job_name = echo - -src_db = dbname=testdb -dst_db = dbname=testdb - -pgq_queue_name = echo_queue - -logfile = ~/log/%(job_name)s.log -pidfile = ~/pid/%(job_name)s.pid - -dst_query = - select * - from pgq.insert_event('echo_queue', %%(pgq.ev_type)s, %%(pgq.ev_data)s) -""" - -import sys, pgq, skytools -skytools.sane_config = 1 - -class SimpleSerialConsumer(pgq.SerialConsumer): - doc_string = __doc__ - - def __init__(self, args): - pgq.SerialConsumer.__init__(self,"simple_serial_consumer","src_db","dst_db", args) - self.dst_query = self.cf.get("dst_query") - - def process_remote_batch(self, db, batch_id, event_list, dst_db): - curs = dst_db.cursor() - for ev in event_list: - payload = skytools.db_urldecode(ev.data) - if payload is None: - payload = {} - payload['pgq.ev_type'] = ev.type - payload['pgq.ev_data'] = ev.data - payload['pgq.ev_id'] = ev.id - payload['pgq.ev_time'] = ev.time - payload['pgq.ev_extra1'] = ev.extra1 - payload['pgq.ev_extra2'] = ev.extra2 - payload['pgq.ev_extra3'] = ev.extra3 - payload['pgq.ev_extra4'] = ev.extra4 - - self.log.debug(self.dst_query % payload) - curs.execute(self.dst_query, payload) - try: - res = curs.dictfetchone() - self.log.debug(res) - except: - pass - -if __name__ == '__main__': - script = SimpleSerialConsumer(sys.argv[1:]) - script.start() diff --git a/scripts/table_dispatcher.py b/scripts/table_dispatcher.py deleted file mode 100755 index 52cd2b7b..00000000 --- a/scripts/table_dispatcher.py +++ /dev/null @@ -1,149 +0,0 @@ -#! /usr/bin/env python - -"""It loads urlencoded rows for one trable from queue and inserts -them into actual tables, with optional partitioning. - ---ini -[table_dispatcher] -job_name = test_move - -src_db = dbname=sourcedb_test -dst_db = dbname=dataminedb_test - -pgq_queue_name = OrderLog - -logfile = ~/log/%(job_name)s.log -pidfile = ~/pid/%(job_name)s.pid - -# where to put data. when partitioning, will be used as base name -dest_table = orders - -# date field with will be used for partitioning -# special value: _EVTIME - event creation time -part_column = start_date - -#fields = * -#fields = id, name -#fields = id:newid, name, bar:baz - - -# template used for creating partition tables -# _DEST_TABLE -part_template = - create table _DEST_TABLE () inherits (orders); - alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id); - grant select on _DEST_TABLE to group reporting; -""" - -import sys, os, pgq, skytools - -DEST_TABLE = "_DEST_TABLE" -SCHEMA_TABLE = "_SCHEMA_TABLE" - -class TableDispatcher(pgq.SerialConsumer): - """Single-table partitioner.""" - def __init__(self, args): - pgq.SerialConsumer.__init__(self, "table_dispatcher", "src_db", "dst_db", args) - - self.part_template = self.cf.get("part_template", '') - self.dest_table = self.cf.get("dest_table") - self.part_field = self.cf.get("part_field", '') - self.part_method = self.cf.get("part_method", 'daily') - if self.part_method not in ('daily', 'monthly'): - raise Exception('bad part_method') - - if self.cf.get("fields", "*") == "*": - self.field_map = None - else: - self.field_map = {} - for fval in self.cf.getlist('fields'): - tmp = fval.split(':') - if len(tmp) == 1: - self.field_map[tmp[0]] = tmp[0] - else: - self.field_map[tmp[0]] = tmp[1] - - def process_remote_batch(self, src_db, batch_id, ev_list, dst_db): - # actual processing - self.dispatch(dst_db, ev_list) - - def dispatch(self, dst_db, ev_list): - """Generic dispatcher.""" - - # load data - tables = {} - for ev in ev_list: - row = skytools.db_urldecode(ev.data) - - # guess dest table - if self.part_field: - if self.part_field == "_EVTIME": - partval = str(ev.creation_date) - else: - partval = str(row[self.part_field]) - partval = partval.split(' ')[0] - date = partval.split('-') - if self.part_method == 'monthly': - date = date[:2] - suffix = '_'.join(date) - tbl = "%s_%s" % (self.dest_table, suffix) - else: - tbl = self.dest_table - - # map fields - if self.field_map is None: - dstrow = row - else: - dstrow = {} - for k, v in self.field_map.items(): - dstrow[v] = row[k] - - # add row into table - if not tbl in tables: - tables[tbl] = [dstrow] - else: - tables[tbl].append(dstrow) - - # create tables if needed - self.check_tables(dst_db, tables) - - # insert into data tables - curs = dst_db.cursor() - for tbl, tbl_rows in tables.items(): - skytools.magic_insert(curs, tbl, tbl_rows) - - def check_tables(self, dcon, tables): - """Checks that tables needed for copy are there. If not - then creates them. - - Used by other procedures to ensure that table is there - before they start inserting. - - The commits should not be dangerous, as we haven't done anything - with cdr's yet, so they should still be in one TX. - - Although it would be nicer to have a lock for table creation. - """ - - dcur = dcon.cursor() - for tbl in tables.keys(): - if not skytools.exists_table(dcur, tbl): - if not self.part_template: - raise Exception('Dest table does not exists and no way to create it.') - - sql = self.part_template - sql = sql.replace(DEST_TABLE, skytools.quote_fqident(tbl)) - - # we do this to make sure that constraints for - # tables who contain a schema will still work - schema_table = tbl.replace(".", "__") - sql = sql.replace(SCHEMA_TABLE, skytools.quote_ident(schema_table)) - - dcur.execute(sql) - dcon.commit() - self.log.info('%s: Created table %s' % (self.job_name, tbl)) - -if __name__ == '__main__': - script = TableDispatcher(sys.argv[1:]) - script.start() - |
