diff options
author | Marko Kreen | 2009-06-05 10:28:31 +0000 |
---|---|---|
committer | Marko Kreen | 2009-06-07 15:03:25 +0000 |
commit | e9f032912997443878fdd70634ec644d73fef1c8 (patch) | |
tree | 14c68cafef16551381229486fc757b9044016de4 /scripts/cube_dispatcher.py | |
parent | c8b2d68fcd66a94ec748ece7d280d76e5238e752 (diff) |
move bulk_loader/cube_dispatcher/table_dispatcher to old/
The scripts are obsolete but kept as samples. To notify
that they are not maintained actively, move them away.
Diffstat (limited to 'scripts/cube_dispatcher.py')
-rwxr-xr-x | scripts/cube_dispatcher.py | 203 |
1 files changed, 0 insertions, 203 deletions
diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py deleted file mode 100755 index 76a3ab3f..00000000 --- a/scripts/cube_dispatcher.py +++ /dev/null @@ -1,203 +0,0 @@ -#! /usr/bin/env python - -"""It accepts urlencoded rows for multiple tables from queue -and insert them into actual tables, with partitioning on tick time. - -Config template:: - - [cube_dispatcher] - job_name = cd_srcdb_queue_to_dstdb_dstcolo.ini - - src_db = dbname=sourcedb_test - dst_db = dbname=dataminedb_test - - pgq_queue_name = udata.some_queue - - logfile = ~/log/%(job_name)s.log - pidfile = ~/pid/%(job_name)s.pid - - # how many rows are kept: keep_latest, keep_all - mode = keep_latest - - # to_char() fmt for table suffix - #dateformat = YYYY_MM_DD - # following disables table suffixes: - #dateformat = - - part_template = - create table _DEST_TABLE (like _PARENT); - alter table only _DEST_TABLE add primary key (_PKEY); - grant select on _DEST_TABLE to postgres; -""" - -import sys, os, pgq, skytools - -DEF_CREATE = """ -create table _DEST_TABLE (like _PARENT); -alter table only _DEST_TABLE add primary key (_PKEY); -""" - -class CubeDispatcher(pgq.SerialConsumer): - __doc__ = __doc__ - - def __init__(self, args): - pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args) - - self.dateformat = self.cf.get('dateformat', 'YYYY_MM_DD') - - self.part_template = self.cf.get('part_template', DEF_CREATE) - - mode = self.cf.get('mode', 'keep_latest') - if mode == 'keep_latest': - self.keep_latest = 1 - elif mode == 'keep_all': - self.keep_latest = 0 - else: - self.log.fatal('wrong mode setting') - sys.exit(1) - - def get_part_date(self, batch_id): - if not self.dateformat: - return None - - # fetch and format batch date - src_db = self.get_database('src_db') - curs = src_db.cursor() - q = 'select to_char(batch_end, %s) from pgq.get_batch_info(%s)' - curs.execute(q, [self.dateformat, batch_id]) - src_db.commit() - return curs.fetchone()[0] - - def process_remote_batch(self, src_db, batch_id, ev_list, dst_db): - # actual processing - self.dispatch(dst_db, ev_list, self.get_part_date(batch_id)) - - def dispatch(self, dst_db, ev_list, date_str): - """Actual event processing.""" - - # get tables and sql - tables = {} - sql_list = [] - for ev in ev_list: - if date_str: - tbl = "%s_%s" % (ev.extra1, date_str) - else: - tbl = ev.extra1 - - sql = self.make_sql(tbl, ev) - sql_list.append(sql) - - if not tbl in tables: - tables[tbl] = self.get_table_info(ev, tbl) - - # create tables if needed - self.check_tables(dst_db, tables) - - # insert into data tables - curs = dst_db.cursor() - block = [] - for sql in sql_list: - self.log.debug(sql) - block.append(sql) - if len(block) > 100: - curs.execute("\n".join(block)) - block = [] - if len(block) > 0: - curs.execute("\n".join(block)) - - def get_table_info(self, ev, tbl): - klist = [skytools.quote_ident(k) for k in ev.key_list.split(',')] - inf = { - 'parent': ev.extra1, - 'table': tbl, - 'key_list': ",".join(klist), - } - return inf - - def make_sql(self, tbl, ev): - """Return SQL statement(s) for that event.""" - - # parse data - data = skytools.db_urldecode(ev.data) - - # parse tbl info - if ev.type.find(':') > 0: - op, keys = ev.type.split(':') - else: - op = ev.type - keys = ev.extra2 - ev.key_list = keys - key_list = keys.split(',') - if self.keep_latest and len(key_list) == 0: - raise Exception('No pkey on table %s' % tbl) - - # generate sql - if op in ('I', 'U'): - if self.keep_latest: - sql = "%s %s" % (self.mk_delete_sql(tbl, key_list, data), - self.mk_insert_sql(tbl, key_list, data)) - else: - sql = self.mk_insert_sql(tbl, key_list, data) - elif op == "D": - if not self.keep_latest: - raise Exception('Delete op not supported if mode=keep_all') - - sql = self.mk_delete_sql(tbl, key_list, data) - else: - raise Exception('Unknown row op: %s' % op) - return sql - - def mk_delete_sql(self, tbl, key_list, data): - # generate delete command - whe_list = [] - for k in key_list: - whe_list.append("%s = %s" % (skytools.quote_ident(k), skytools.quote_literal(data[k]))) - whe_str = " and ".join(whe_list) - return "delete from %s where %s;" % (skytools.quote_fqident(tbl), whe_str) - - def mk_insert_sql(self, tbl, key_list, data): - # generate insert command - col_list = [] - val_list = [] - for c, v in data.items(): - col_list.append(skytools.quote_ident(c)) - val_list.append(skytools.quote_literal(v)) - col_str = ",".join(col_list) - val_str = ",".join(val_list) - return "insert into %s (%s) values (%s);" % ( - skytools.quote_fqident(tbl), col_str, val_str) - - def check_tables(self, dcon, tables): - """Checks that tables needed for copy are there. If not - then creates them. - - Used by other procedures to ensure that table is there - before they start inserting. - - The commits should not be dangerous, as we haven't done anything - with cdr's yet, so they should still be in one TX. - - Although it would be nicer to have a lock for table creation. - """ - - dcur = dcon.cursor() - for tbl, inf in tables.items(): - if skytools.exists_table(dcur, tbl): - continue - - sql = self.part_template - sql = sql.replace('_DEST_TABLE', skytools.quote_fqident(inf['table'])) - sql = sql.replace('_PARENT', skytools.quote_fqident(inf['parent'])) - sql = sql.replace('_PKEY', inf['key_list']) - # be similar to table_dispatcher - schema_table = inf['table'].replace(".", "__") - sql = sql.replace('_SCHEMA_TABLE', skytools.quote_ident(schema_table)) - - dcur.execute(sql) - dcon.commit() - self.log.info('%s: Created table %s' % (self.job_name, tbl)) - -if __name__ == '__main__': - script = CubeDispatcher(sys.argv[1:]) - script.start() - |