summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorMarko Kreen2009-06-05 10:28:31 +0000
committerMarko Kreen2009-06-07 15:03:25 +0000
commite9f032912997443878fdd70634ec644d73fef1c8 (patch)
tree14c68cafef16551381229486fc757b9044016de4 /scripts
parentc8b2d68fcd66a94ec748ece7d280d76e5238e752 (diff)
move bulk_loader/cube_dispatcher/table_dispatcher to old/
The scripts are obsolete but kept as samples. To notify that they are not maintained actively, move them away.
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/bulk_loader.py423
-rwxr-xr-xscripts/cube_dispatcher.py203
-rwxr-xr-xscripts/simple_serial_consumer.py93
-rwxr-xr-xscripts/table_dispatcher.py149
4 files changed, 0 insertions, 868 deletions
diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py
deleted file mode 100755
index c9a67496..00000000
--- a/scripts/bulk_loader.py
+++ /dev/null
@@ -1,423 +0,0 @@
-#! /usr/bin/env python
-
-"""Bulkloader for slow databases (Bizgres).
-
-Idea is following:
- - Script reads from queue a batch of urlencoded row changes.
- Inserts/updates/deletes, maybe many per one row.
- - It creates 3 lists: ins_list, upd_list, del_list.
- If one row is changed several times, it keeps the latest.
- - Lists are processed in followin way:
- ins_list - COPY into main table
- upd_list - COPY into temp table, UPDATE from there
- del_list - COPY into temp table, DELETE from there
- - One side-effect is that total order of how rows appear
- changes, but per-row changes will be kept in order.
-
-The speedup from the COPY will happen only if the batches are
-large enough. So the ticks should happen only after couple
-of minutes.
-
-bl_sourcedb_queue_to_destdb.ini
-
-Config template::
- [bulk_loader]
- # job name is optional when not given ini file name is used
- job_name = bl_sourcedb_queue_to_destdb
-
- src_db = dbname=sourcedb
- dst_db = dbname=destdb
-
- pgq_queue_name = source_queue
-
- use_skylog = 0
-
- logfile = ~/log/%(job_name)s.log
- pidfile = ~/pid/%(job_name)s.pid
-
- # 0 - apply UPDATE as UPDATE
- # 1 - apply UPDATE as DELETE+INSERT
- # 2 - merge INSERT/UPDATE, do DELETE+INSERT
- load_method = 0
-
- # no hurry
- loop_delay = 10
-
- # table renaming
- # remap_tables = skypein_cdr_closed:skypein_cdr, tbl1:tbl2
-"""
-
-import sys, os, pgq, skytools
-from skytools import quote_ident, quote_fqident
-
-
-## several methods for applying data
-
-# update as update
-METH_CORRECT = 0
-# update as delete/copy
-METH_DELETE = 1
-# merge ins_list and upd_list, do delete/copy
-METH_MERGED = 2
-
-# no good method for temp table check before 8.2
-USE_LONGLIVED_TEMP_TABLES = False
-
-AVOID_BIZGRES_BUG = 1
-
-def find_dist_fields(curs, fqtbl):
- if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"):
- return []
- schema, name = fqtbl.split('.')
- q = "select a.attname"\
- " from pg_class t, pg_namespace n, pg_attribute a,"\
- " mpp_distribution_policy p"\
- " where n.oid = t.relnamespace"\
- " and p.localoid = t.oid"\
- " and a.attrelid = t.oid"\
- " and a.attnum = any(p.attrnums)"\
- " and n.nspname = %s and t.relname = %s"
- curs.execute(q, [schema, name])
- res = []
- for row in curs.fetchall():
- res.append(row[0])
- return res
-
-def exists_temp_table(curs, tbl):
- # correct way, works only on 8.2
- q = "select 1 from pg_class where relname = %s and relnamespace = pg_my_temp_schema()"
-
- # does not work with parallel case
- #q = """
- #select 1 from pg_class t, pg_namespace n
- #where n.oid = t.relnamespace
- # and pg_table_is_visible(t.oid)
- # and has_schema_privilege(n.nspname, 'USAGE')
- # and has_table_privilege(n.nspname || '.' || t.relname, 'SELECT')
- # and substr(n.nspname, 1, 8) = 'pg_temp_'
- # and t.relname = %s;
- #"""
- curs.execute(q, [tbl])
- tmp = curs.fetchall()
- return len(tmp) > 0
-
-class TableCache:
- """Per-table data hander."""
-
- def __init__(self, tbl):
- """Init per-batch table data cache."""
- self.name = tbl
- self.ev_list = []
- self.pkey_map = {}
- self.pkey_list = []
- self.pkey_str = None
- self.col_list = None
-
- self.final_ins_list = []
- self.final_upd_list = []
- self.final_del_list = []
-
- def add_event(self, ev):
- """Store new event."""
-
- # op & data
- ev.op = ev.ev_type[0]
- ev.data = skytools.db_urldecode(ev.ev_data)
-
- # get pkey column names
- if self.pkey_str is None:
- if len(ev.ev_type) > 2:
- self.pkey_str = ev.ev_type.split(':')[1]
- else:
- self.pkey_str = ev.ev_extra2
-
- if self.pkey_str:
- self.pkey_list = self.pkey_str.split(',')
-
- # get pkey value
- if self.pkey_str:
- pk_data = []
- for k in self.pkey_list:
- pk_data.append(ev.data[k])
- ev.pk_data = tuple(pk_data)
- elif ev.op == 'I':
- # fake pkey, just to get them spread out
- ev.pk_data = ev.id
- else:
- raise Exception('non-pk tables not supported: %s' % self.name)
-
- # get full column list, detect added columns
- if not self.col_list:
- self.col_list = ev.data.keys()
- elif self.col_list != ev.data.keys():
- # ^ supposedly python guarantees same order in keys()
-
- # find new columns
- for c in ev.data.keys():
- if c not in self.col_list:
- for oldev in self.ev_list:
- oldev.data[c] = None
- self.col_list = ev.data.keys()
-
- # add to list
- self.ev_list.append(ev)
-
- # keep all versions of row data
- if ev.pk_data in self.pkey_map:
- self.pkey_map[ev.pk_data].append(ev)
- else:
- self.pkey_map[ev.pk_data] = [ev]
-
- def finish(self):
- """Got all data, prepare for insertion."""
-
- del_list = []
- ins_list = []
- upd_list = []
- for ev_list in self.pkey_map.values():
- # rewrite list of I/U/D events to
- # optional DELETE and optional INSERT/COPY command
- exists_before = -1
- exists_after = 1
- for ev in ev_list:
- if ev.op == "I":
- if exists_before < 0:
- exists_before = 0
- exists_after = 1
- elif ev.op == "U":
- if exists_before < 0:
- exists_before = 1
- #exists_after = 1 # this shouldnt be needed
- elif ev.op == "D":
- if exists_before < 0:
- exists_before = 1
- exists_after = 0
- else:
- raise Exception('unknown event type: %s' % ev.op)
-
- # skip short-lived rows
- if exists_before == 0 and exists_after == 0:
- continue
-
- # take last event
- ev = ev_list[-1]
-
- # generate needed commands
- if exists_before and exists_after:
- upd_list.append(ev.data)
- elif exists_before:
- del_list.append(ev.data)
- elif exists_after:
- ins_list.append(ev.data)
-
- # reorder cols
- new_list = self.pkey_list[:]
- for k in self.col_list:
- if k not in self.pkey_list:
- new_list.append(k)
-
- self.col_list = new_list
- self.final_ins_list = ins_list
- self.final_upd_list = upd_list
- self.final_del_list = del_list
-
-class BulkLoader(pgq.SerialConsumer):
- __doc__ = __doc__
- load_method = METH_CORRECT
- remap_tables = {}
- def __init__(self, args):
- pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args)
-
- def reload(self):
- pgq.SerialConsumer.reload(self)
-
- self.load_method = self.cf.getint("load_method", METH_CORRECT)
- if self.load_method not in (METH_CORRECT,METH_DELETE,METH_MERGED):
- raise Exception("bad load_method")
-
- self.remap_tables = {}
- for mapelem in self.cf.getlist("remap_tables", ''):
- tmp = mapelem.split(':')
- tbl = tmp[0].strip()
- new = tmp[1].strip()
- self.remap_tables[tbl] = new
-
- def process_remote_batch(self, src_db, batch_id, ev_list, dst_db):
- """Content dispatcher."""
-
- # add events to per-table caches
- tables = {}
- for ev in ev_list:
- tbl = ev.extra1
- if not tbl in tables:
- tables[tbl] = TableCache(tbl)
- cache = tables[tbl]
- cache.add_event(ev)
-
- # then process them
- for tbl, cache in tables.items():
- cache.finish()
- self.process_one_table(dst_db, tbl, cache)
-
- def process_one_table(self, dst_db, tbl, cache):
-
- del_list = cache.final_del_list
- ins_list = cache.final_ins_list
- upd_list = cache.final_upd_list
- col_list = cache.col_list
- real_update_count = len(upd_list)
-
- self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % (
- tbl, len(ins_list), len(upd_list), len(del_list)))
-
- if tbl in self.remap_tables:
- old = tbl
- tbl = self.remap_tables[tbl]
- self.log.debug("Redirect %s to %s" % (old, tbl))
-
- # hack to unbroke stuff
- if self.load_method == METH_MERGED:
- upd_list += ins_list
- ins_list = []
-
- # check if interesting table
- curs = dst_db.cursor()
- if not skytools.exists_table(curs, tbl):
- self.log.warning("Ignoring events for table: %s" % tbl)
- return
-
- # fetch distribution fields
- dist_fields = find_dist_fields(curs, tbl)
- extra_fields = []
- for fld in dist_fields:
- if fld not in cache.pkey_list:
- extra_fields.append(fld)
- self.log.debug("PKey fields: %s Extra fields: %s" % (
- ",".join(cache.pkey_list), ",".join(extra_fields)))
-
- # create temp table
- temp = self.create_temp_table(curs, tbl)
-
- # where expr must have pkey and dist fields
- klist = []
- for pk in cache.pkey_list + extra_fields:
- exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk),
- quote_fqident(temp), quote_ident(pk))
- klist.append(exp)
- whe_expr = " and ".join(klist)
-
- # create del sql
- del_sql = "delete from only %s using %s where %s" % (
- quote_fqident(tbl), quote_fqident(temp), whe_expr)
-
- # create update sql
- slist = []
- key_fields = cache.pkey_list + extra_fields
- for col in cache.col_list:
- if col not in key_fields:
- exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col))
- slist.append(exp)
- upd_sql = "update only %s set %s from %s where %s" % (
- quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr)
-
- # insert sql
- colstr = ",".join([quote_ident(c) for c in cache.col_list])
- ins_sql = "insert into %s (%s) select %s from %s" % (
- quote_fqident(tbl), colstr, colstr, quote_fqident(temp))
-
- # process deleted rows
- if len(del_list) > 0:
- self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
- # delete old rows
- q = "truncate %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
- # copy rows
- self.log.debug("COPY %d rows into %s" % (len(del_list), temp))
- skytools.magic_insert(curs, temp, del_list, col_list)
- # delete rows
- self.log.debug(del_sql)
- curs.execute(del_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
- self.log.debug(curs.statusmessage)
- if len(del_list) != curs.rowcount:
- self.log.warning("Delete mismatch: expected=%s updated=%d"
- % (len(del_list), curs.rowcount))
-
- # process updated rows
- if len(upd_list) > 0:
- self.log.info("Updating %d rows in %s" % (len(upd_list), tbl))
- # delete old rows
- q = "truncate %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
- # copy rows
- self.log.debug("COPY %d rows into %s" % (len(upd_list), temp))
- skytools.magic_insert(curs, temp, upd_list, col_list)
- if self.load_method == METH_CORRECT:
- # update main table
- self.log.debug(upd_sql)
- curs.execute(upd_sql)
- self.log.debug(curs.statusmessage)
- # check count
- if len(upd_list) != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s updated=%d"
- % (len(upd_list), curs.rowcount))
- else:
- # delete from main table
- self.log.debug(del_sql)
- curs.execute(del_sql)
- self.log.debug(curs.statusmessage)
- # check count
- if real_update_count != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s deleted=%d"
- % (real_update_count, curs.rowcount))
- # insert into main table
- if AVOID_BIZGRES_BUG:
- # copy again, into main table
- self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl))
- skytools.magic_insert(curs, tbl, upd_list, col_list)
- else:
- # better way, but does not work due bizgres bug
- self.log.debug(ins_sql)
- curs.execute(ins_sql)
- self.log.debug(curs.statusmessage)
-
- # process new rows
- if len(ins_list) > 0:
- self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl))
- skytools.magic_insert(curs, tbl, ins_list, col_list)
-
- # delete remaining rows
- if USE_LONGLIVED_TEMP_TABLES:
- q = "truncate %s" % quote_fqident(temp)
- else:
- # fscking problems with long-lived temp tables
- q = "drop table %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
-
- def create_temp_table(self, curs, tbl):
- # create temp table for loading
- tempname = tbl.replace('.', '_') + "_loadertmp"
-
- # check if exists
- if USE_LONGLIVED_TEMP_TABLES:
- if exists_temp_table(curs, tempname):
- self.log.debug("Using existing temp table %s" % tempname)
- return tempname
-
- # bizgres crashes on delete rows
- arg = "on commit delete rows"
- arg = "on commit preserve rows"
- # create temp table for loading
- q = "create temp table %s (like %s) %s" % (
- quote_fqident(tempname), quote_fqident(tbl), arg)
- self.log.debug("Creating temp table: %s" % q)
- curs.execute(q)
- return tempname
-
-if __name__ == '__main__':
- script = BulkLoader(sys.argv[1:])
- script.start()
-
diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py
deleted file mode 100755
index 76a3ab3f..00000000
--- a/scripts/cube_dispatcher.py
+++ /dev/null
@@ -1,203 +0,0 @@
-#! /usr/bin/env python
-
-"""It accepts urlencoded rows for multiple tables from queue
-and insert them into actual tables, with partitioning on tick time.
-
-Config template::
-
- [cube_dispatcher]
- job_name = cd_srcdb_queue_to_dstdb_dstcolo.ini
-
- src_db = dbname=sourcedb_test
- dst_db = dbname=dataminedb_test
-
- pgq_queue_name = udata.some_queue
-
- logfile = ~/log/%(job_name)s.log
- pidfile = ~/pid/%(job_name)s.pid
-
- # how many rows are kept: keep_latest, keep_all
- mode = keep_latest
-
- # to_char() fmt for table suffix
- #dateformat = YYYY_MM_DD
- # following disables table suffixes:
- #dateformat =
-
- part_template =
- create table _DEST_TABLE (like _PARENT);
- alter table only _DEST_TABLE add primary key (_PKEY);
- grant select on _DEST_TABLE to postgres;
-"""
-
-import sys, os, pgq, skytools
-
-DEF_CREATE = """
-create table _DEST_TABLE (like _PARENT);
-alter table only _DEST_TABLE add primary key (_PKEY);
-"""
-
-class CubeDispatcher(pgq.SerialConsumer):
- __doc__ = __doc__
-
- def __init__(self, args):
- pgq.SerialConsumer.__init__(self, "cube_dispatcher", "src_db", "dst_db", args)
-
- self.dateformat = self.cf.get('dateformat', 'YYYY_MM_DD')
-
- self.part_template = self.cf.get('part_template', DEF_CREATE)
-
- mode = self.cf.get('mode', 'keep_latest')
- if mode == 'keep_latest':
- self.keep_latest = 1
- elif mode == 'keep_all':
- self.keep_latest = 0
- else:
- self.log.fatal('wrong mode setting')
- sys.exit(1)
-
- def get_part_date(self, batch_id):
- if not self.dateformat:
- return None
-
- # fetch and format batch date
- src_db = self.get_database('src_db')
- curs = src_db.cursor()
- q = 'select to_char(batch_end, %s) from pgq.get_batch_info(%s)'
- curs.execute(q, [self.dateformat, batch_id])
- src_db.commit()
- return curs.fetchone()[0]
-
- def process_remote_batch(self, src_db, batch_id, ev_list, dst_db):
- # actual processing
- self.dispatch(dst_db, ev_list, self.get_part_date(batch_id))
-
- def dispatch(self, dst_db, ev_list, date_str):
- """Actual event processing."""
-
- # get tables and sql
- tables = {}
- sql_list = []
- for ev in ev_list:
- if date_str:
- tbl = "%s_%s" % (ev.extra1, date_str)
- else:
- tbl = ev.extra1
-
- sql = self.make_sql(tbl, ev)
- sql_list.append(sql)
-
- if not tbl in tables:
- tables[tbl] = self.get_table_info(ev, tbl)
-
- # create tables if needed
- self.check_tables(dst_db, tables)
-
- # insert into data tables
- curs = dst_db.cursor()
- block = []
- for sql in sql_list:
- self.log.debug(sql)
- block.append(sql)
- if len(block) > 100:
- curs.execute("\n".join(block))
- block = []
- if len(block) > 0:
- curs.execute("\n".join(block))
-
- def get_table_info(self, ev, tbl):
- klist = [skytools.quote_ident(k) for k in ev.key_list.split(',')]
- inf = {
- 'parent': ev.extra1,
- 'table': tbl,
- 'key_list': ",".join(klist),
- }
- return inf
-
- def make_sql(self, tbl, ev):
- """Return SQL statement(s) for that event."""
-
- # parse data
- data = skytools.db_urldecode(ev.data)
-
- # parse tbl info
- if ev.type.find(':') > 0:
- op, keys = ev.type.split(':')
- else:
- op = ev.type
- keys = ev.extra2
- ev.key_list = keys
- key_list = keys.split(',')
- if self.keep_latest and len(key_list) == 0:
- raise Exception('No pkey on table %s' % tbl)
-
- # generate sql
- if op in ('I', 'U'):
- if self.keep_latest:
- sql = "%s %s" % (self.mk_delete_sql(tbl, key_list, data),
- self.mk_insert_sql(tbl, key_list, data))
- else:
- sql = self.mk_insert_sql(tbl, key_list, data)
- elif op == "D":
- if not self.keep_latest:
- raise Exception('Delete op not supported if mode=keep_all')
-
- sql = self.mk_delete_sql(tbl, key_list, data)
- else:
- raise Exception('Unknown row op: %s' % op)
- return sql
-
- def mk_delete_sql(self, tbl, key_list, data):
- # generate delete command
- whe_list = []
- for k in key_list:
- whe_list.append("%s = %s" % (skytools.quote_ident(k), skytools.quote_literal(data[k])))
- whe_str = " and ".join(whe_list)
- return "delete from %s where %s;" % (skytools.quote_fqident(tbl), whe_str)
-
- def mk_insert_sql(self, tbl, key_list, data):
- # generate insert command
- col_list = []
- val_list = []
- for c, v in data.items():
- col_list.append(skytools.quote_ident(c))
- val_list.append(skytools.quote_literal(v))
- col_str = ",".join(col_list)
- val_str = ",".join(val_list)
- return "insert into %s (%s) values (%s);" % (
- skytools.quote_fqident(tbl), col_str, val_str)
-
- def check_tables(self, dcon, tables):
- """Checks that tables needed for copy are there. If not
- then creates them.
-
- Used by other procedures to ensure that table is there
- before they start inserting.
-
- The commits should not be dangerous, as we haven't done anything
- with cdr's yet, so they should still be in one TX.
-
- Although it would be nicer to have a lock for table creation.
- """
-
- dcur = dcon.cursor()
- for tbl, inf in tables.items():
- if skytools.exists_table(dcur, tbl):
- continue
-
- sql = self.part_template
- sql = sql.replace('_DEST_TABLE', skytools.quote_fqident(inf['table']))
- sql = sql.replace('_PARENT', skytools.quote_fqident(inf['parent']))
- sql = sql.replace('_PKEY', inf['key_list'])
- # be similar to table_dispatcher
- schema_table = inf['table'].replace(".", "__")
- sql = sql.replace('_SCHEMA_TABLE', skytools.quote_ident(schema_table))
-
- dcur.execute(sql)
- dcon.commit()
- self.log.info('%s: Created table %s' % (self.job_name, tbl))
-
-if __name__ == '__main__':
- script = CubeDispatcher(sys.argv[1:])
- script.start()
-
diff --git a/scripts/simple_serial_consumer.py b/scripts/simple_serial_consumer.py
deleted file mode 100755
index 2bd06be7..00000000
--- a/scripts/simple_serial_consumer.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#! /usr/bin/env python
-
-"""simple serial consumer for skytools3
-
-it consumes events from a predefined queue and feeds them to a sql statement
-
-Config template::
-
-[simple_serial_consumer]
-job_name = descriptive_name_for_job
-
-src_db = dbname=sourcedb_test
-dst_db = dbname=destdb port=1234 host=dbhost.com username=guest password=secret
-
-pgq_queue_name = source_queue
-
-logfile = ~/log/%(job_name)s.log
-pidfile = ~/pid/%(job_name)s.pid
-
-dst_query = select 1
-
-use_skylog = 0
-"""
-
-"""Config example::
-
-Create a queue named "echo_queue" in a database (like "testdb")
-
-Register consumer "echo" to this queue
-
-Start the echo consumer with config file shown below
-(You may want to use -v to see, what will happen)
-
-From some other window, insert something into the queue:
- select pgq.insert_event('echo_queue','type','hello=world');
-
-Enjoy the ride :)
-
-If dst_query is set to "select 1" then echo consumer becomes a sink consumer
-
-[simple_serial_consumer]
-
-job_name = echo
-
-src_db = dbname=testdb
-dst_db = dbname=testdb
-
-pgq_queue_name = echo_queue
-
-logfile = ~/log/%(job_name)s.log
-pidfile = ~/pid/%(job_name)s.pid
-
-dst_query =
- select *
- from pgq.insert_event('echo_queue', %%(pgq.ev_type)s, %%(pgq.ev_data)s)
-"""
-
-import sys, pgq, skytools
-skytools.sane_config = 1
-
-class SimpleSerialConsumer(pgq.SerialConsumer):
- doc_string = __doc__
-
- def __init__(self, args):
- pgq.SerialConsumer.__init__(self,"simple_serial_consumer","src_db","dst_db", args)
- self.dst_query = self.cf.get("dst_query")
-
- def process_remote_batch(self, db, batch_id, event_list, dst_db):
- curs = dst_db.cursor()
- for ev in event_list:
- payload = skytools.db_urldecode(ev.data)
- if payload is None:
- payload = {}
- payload['pgq.ev_type'] = ev.type
- payload['pgq.ev_data'] = ev.data
- payload['pgq.ev_id'] = ev.id
- payload['pgq.ev_time'] = ev.time
- payload['pgq.ev_extra1'] = ev.extra1
- payload['pgq.ev_extra2'] = ev.extra2
- payload['pgq.ev_extra3'] = ev.extra3
- payload['pgq.ev_extra4'] = ev.extra4
-
- self.log.debug(self.dst_query % payload)
- curs.execute(self.dst_query, payload)
- try:
- res = curs.dictfetchone()
- self.log.debug(res)
- except:
- pass
-
-if __name__ == '__main__':
- script = SimpleSerialConsumer(sys.argv[1:])
- script.start()
diff --git a/scripts/table_dispatcher.py b/scripts/table_dispatcher.py
deleted file mode 100755
index 52cd2b7b..00000000
--- a/scripts/table_dispatcher.py
+++ /dev/null
@@ -1,149 +0,0 @@
-#! /usr/bin/env python
-
-"""It loads urlencoded rows for one trable from queue and inserts
-them into actual tables, with optional partitioning.
-
---ini
-[table_dispatcher]
-job_name = test_move
-
-src_db = dbname=sourcedb_test
-dst_db = dbname=dataminedb_test
-
-pgq_queue_name = OrderLog
-
-logfile = ~/log/%(job_name)s.log
-pidfile = ~/pid/%(job_name)s.pid
-
-# where to put data. when partitioning, will be used as base name
-dest_table = orders
-
-# date field with will be used for partitioning
-# special value: _EVTIME - event creation time
-part_column = start_date
-
-#fields = *
-#fields = id, name
-#fields = id:newid, name, bar:baz
-
-
-# template used for creating partition tables
-# _DEST_TABLE
-part_template =
- create table _DEST_TABLE () inherits (orders);
- alter table only _DEST_TABLE add constraint _DEST_TABLE_pkey primary key (id);
- grant select on _DEST_TABLE to group reporting;
-"""
-
-import sys, os, pgq, skytools
-
-DEST_TABLE = "_DEST_TABLE"
-SCHEMA_TABLE = "_SCHEMA_TABLE"
-
-class TableDispatcher(pgq.SerialConsumer):
- """Single-table partitioner."""
- def __init__(self, args):
- pgq.SerialConsumer.__init__(self, "table_dispatcher", "src_db", "dst_db", args)
-
- self.part_template = self.cf.get("part_template", '')
- self.dest_table = self.cf.get("dest_table")
- self.part_field = self.cf.get("part_field", '')
- self.part_method = self.cf.get("part_method", 'daily')
- if self.part_method not in ('daily', 'monthly'):
- raise Exception('bad part_method')
-
- if self.cf.get("fields", "*") == "*":
- self.field_map = None
- else:
- self.field_map = {}
- for fval in self.cf.getlist('fields'):
- tmp = fval.split(':')
- if len(tmp) == 1:
- self.field_map[tmp[0]] = tmp[0]
- else:
- self.field_map[tmp[0]] = tmp[1]
-
- def process_remote_batch(self, src_db, batch_id, ev_list, dst_db):
- # actual processing
- self.dispatch(dst_db, ev_list)
-
- def dispatch(self, dst_db, ev_list):
- """Generic dispatcher."""
-
- # load data
- tables = {}
- for ev in ev_list:
- row = skytools.db_urldecode(ev.data)
-
- # guess dest table
- if self.part_field:
- if self.part_field == "_EVTIME":
- partval = str(ev.creation_date)
- else:
- partval = str(row[self.part_field])
- partval = partval.split(' ')[0]
- date = partval.split('-')
- if self.part_method == 'monthly':
- date = date[:2]
- suffix = '_'.join(date)
- tbl = "%s_%s" % (self.dest_table, suffix)
- else:
- tbl = self.dest_table
-
- # map fields
- if self.field_map is None:
- dstrow = row
- else:
- dstrow = {}
- for k, v in self.field_map.items():
- dstrow[v] = row[k]
-
- # add row into table
- if not tbl in tables:
- tables[tbl] = [dstrow]
- else:
- tables[tbl].append(dstrow)
-
- # create tables if needed
- self.check_tables(dst_db, tables)
-
- # insert into data tables
- curs = dst_db.cursor()
- for tbl, tbl_rows in tables.items():
- skytools.magic_insert(curs, tbl, tbl_rows)
-
- def check_tables(self, dcon, tables):
- """Checks that tables needed for copy are there. If not
- then creates them.
-
- Used by other procedures to ensure that table is there
- before they start inserting.
-
- The commits should not be dangerous, as we haven't done anything
- with cdr's yet, so they should still be in one TX.
-
- Although it would be nicer to have a lock for table creation.
- """
-
- dcur = dcon.cursor()
- for tbl in tables.keys():
- if not skytools.exists_table(dcur, tbl):
- if not self.part_template:
- raise Exception('Dest table does not exists and no way to create it.')
-
- sql = self.part_template
- sql = sql.replace(DEST_TABLE, skytools.quote_fqident(tbl))
-
- # we do this to make sure that constraints for
- # tables who contain a schema will still work
- schema_table = tbl.replace(".", "__")
- sql = sql.replace(SCHEMA_TABLE, skytools.quote_ident(schema_table))
-
- dcur.execute(sql)
- dcon.commit()
- self.log.info('%s: Created table %s' % (self.job_name, tbl))
-
-if __name__ == '__main__':
- script = TableDispatcher(sys.argv[1:])
- script.start()
-