summaryrefslogtreecommitdiff
path: root/scripts/bulk_loader.py
diff options
context:
space:
mode:
authorMarko Kreen2009-06-05 10:28:31 +0000
committerMarko Kreen2009-06-07 15:03:25 +0000
commite9f032912997443878fdd70634ec644d73fef1c8 (patch)
tree14c68cafef16551381229486fc757b9044016de4 /scripts/bulk_loader.py
parentc8b2d68fcd66a94ec748ece7d280d76e5238e752 (diff)
move bulk_loader/cube_dispatcher/table_dispatcher to old/
The scripts are obsolete but kept as samples. To notify that they are not maintained actively, move them away.
Diffstat (limited to 'scripts/bulk_loader.py')
-rwxr-xr-xscripts/bulk_loader.py423
1 files changed, 0 insertions, 423 deletions
diff --git a/scripts/bulk_loader.py b/scripts/bulk_loader.py
deleted file mode 100755
index c9a67496..00000000
--- a/scripts/bulk_loader.py
+++ /dev/null
@@ -1,423 +0,0 @@
-#! /usr/bin/env python
-
-"""Bulkloader for slow databases (Bizgres).
-
-Idea is following:
- - Script reads from queue a batch of urlencoded row changes.
- Inserts/updates/deletes, maybe many per one row.
- - It creates 3 lists: ins_list, upd_list, del_list.
- If one row is changed several times, it keeps the latest.
- - Lists are processed in followin way:
- ins_list - COPY into main table
- upd_list - COPY into temp table, UPDATE from there
- del_list - COPY into temp table, DELETE from there
- - One side-effect is that total order of how rows appear
- changes, but per-row changes will be kept in order.
-
-The speedup from the COPY will happen only if the batches are
-large enough. So the ticks should happen only after couple
-of minutes.
-
-bl_sourcedb_queue_to_destdb.ini
-
-Config template::
- [bulk_loader]
- # job name is optional when not given ini file name is used
- job_name = bl_sourcedb_queue_to_destdb
-
- src_db = dbname=sourcedb
- dst_db = dbname=destdb
-
- pgq_queue_name = source_queue
-
- use_skylog = 0
-
- logfile = ~/log/%(job_name)s.log
- pidfile = ~/pid/%(job_name)s.pid
-
- # 0 - apply UPDATE as UPDATE
- # 1 - apply UPDATE as DELETE+INSERT
- # 2 - merge INSERT/UPDATE, do DELETE+INSERT
- load_method = 0
-
- # no hurry
- loop_delay = 10
-
- # table renaming
- # remap_tables = skypein_cdr_closed:skypein_cdr, tbl1:tbl2
-"""
-
-import sys, os, pgq, skytools
-from skytools import quote_ident, quote_fqident
-
-
-## several methods for applying data
-
-# update as update
-METH_CORRECT = 0
-# update as delete/copy
-METH_DELETE = 1
-# merge ins_list and upd_list, do delete/copy
-METH_MERGED = 2
-
-# no good method for temp table check before 8.2
-USE_LONGLIVED_TEMP_TABLES = False
-
-AVOID_BIZGRES_BUG = 1
-
-def find_dist_fields(curs, fqtbl):
- if not skytools.exists_table(curs, "pg_catalog.mpp_distribution_policy"):
- return []
- schema, name = fqtbl.split('.')
- q = "select a.attname"\
- " from pg_class t, pg_namespace n, pg_attribute a,"\
- " mpp_distribution_policy p"\
- " where n.oid = t.relnamespace"\
- " and p.localoid = t.oid"\
- " and a.attrelid = t.oid"\
- " and a.attnum = any(p.attrnums)"\
- " and n.nspname = %s and t.relname = %s"
- curs.execute(q, [schema, name])
- res = []
- for row in curs.fetchall():
- res.append(row[0])
- return res
-
-def exists_temp_table(curs, tbl):
- # correct way, works only on 8.2
- q = "select 1 from pg_class where relname = %s and relnamespace = pg_my_temp_schema()"
-
- # does not work with parallel case
- #q = """
- #select 1 from pg_class t, pg_namespace n
- #where n.oid = t.relnamespace
- # and pg_table_is_visible(t.oid)
- # and has_schema_privilege(n.nspname, 'USAGE')
- # and has_table_privilege(n.nspname || '.' || t.relname, 'SELECT')
- # and substr(n.nspname, 1, 8) = 'pg_temp_'
- # and t.relname = %s;
- #"""
- curs.execute(q, [tbl])
- tmp = curs.fetchall()
- return len(tmp) > 0
-
-class TableCache:
- """Per-table data hander."""
-
- def __init__(self, tbl):
- """Init per-batch table data cache."""
- self.name = tbl
- self.ev_list = []
- self.pkey_map = {}
- self.pkey_list = []
- self.pkey_str = None
- self.col_list = None
-
- self.final_ins_list = []
- self.final_upd_list = []
- self.final_del_list = []
-
- def add_event(self, ev):
- """Store new event."""
-
- # op & data
- ev.op = ev.ev_type[0]
- ev.data = skytools.db_urldecode(ev.ev_data)
-
- # get pkey column names
- if self.pkey_str is None:
- if len(ev.ev_type) > 2:
- self.pkey_str = ev.ev_type.split(':')[1]
- else:
- self.pkey_str = ev.ev_extra2
-
- if self.pkey_str:
- self.pkey_list = self.pkey_str.split(',')
-
- # get pkey value
- if self.pkey_str:
- pk_data = []
- for k in self.pkey_list:
- pk_data.append(ev.data[k])
- ev.pk_data = tuple(pk_data)
- elif ev.op == 'I':
- # fake pkey, just to get them spread out
- ev.pk_data = ev.id
- else:
- raise Exception('non-pk tables not supported: %s' % self.name)
-
- # get full column list, detect added columns
- if not self.col_list:
- self.col_list = ev.data.keys()
- elif self.col_list != ev.data.keys():
- # ^ supposedly python guarantees same order in keys()
-
- # find new columns
- for c in ev.data.keys():
- if c not in self.col_list:
- for oldev in self.ev_list:
- oldev.data[c] = None
- self.col_list = ev.data.keys()
-
- # add to list
- self.ev_list.append(ev)
-
- # keep all versions of row data
- if ev.pk_data in self.pkey_map:
- self.pkey_map[ev.pk_data].append(ev)
- else:
- self.pkey_map[ev.pk_data] = [ev]
-
- def finish(self):
- """Got all data, prepare for insertion."""
-
- del_list = []
- ins_list = []
- upd_list = []
- for ev_list in self.pkey_map.values():
- # rewrite list of I/U/D events to
- # optional DELETE and optional INSERT/COPY command
- exists_before = -1
- exists_after = 1
- for ev in ev_list:
- if ev.op == "I":
- if exists_before < 0:
- exists_before = 0
- exists_after = 1
- elif ev.op == "U":
- if exists_before < 0:
- exists_before = 1
- #exists_after = 1 # this shouldnt be needed
- elif ev.op == "D":
- if exists_before < 0:
- exists_before = 1
- exists_after = 0
- else:
- raise Exception('unknown event type: %s' % ev.op)
-
- # skip short-lived rows
- if exists_before == 0 and exists_after == 0:
- continue
-
- # take last event
- ev = ev_list[-1]
-
- # generate needed commands
- if exists_before and exists_after:
- upd_list.append(ev.data)
- elif exists_before:
- del_list.append(ev.data)
- elif exists_after:
- ins_list.append(ev.data)
-
- # reorder cols
- new_list = self.pkey_list[:]
- for k in self.col_list:
- if k not in self.pkey_list:
- new_list.append(k)
-
- self.col_list = new_list
- self.final_ins_list = ins_list
- self.final_upd_list = upd_list
- self.final_del_list = del_list
-
-class BulkLoader(pgq.SerialConsumer):
- __doc__ = __doc__
- load_method = METH_CORRECT
- remap_tables = {}
- def __init__(self, args):
- pgq.SerialConsumer.__init__(self, "bulk_loader", "src_db", "dst_db", args)
-
- def reload(self):
- pgq.SerialConsumer.reload(self)
-
- self.load_method = self.cf.getint("load_method", METH_CORRECT)
- if self.load_method not in (METH_CORRECT,METH_DELETE,METH_MERGED):
- raise Exception("bad load_method")
-
- self.remap_tables = {}
- for mapelem in self.cf.getlist("remap_tables", ''):
- tmp = mapelem.split(':')
- tbl = tmp[0].strip()
- new = tmp[1].strip()
- self.remap_tables[tbl] = new
-
- def process_remote_batch(self, src_db, batch_id, ev_list, dst_db):
- """Content dispatcher."""
-
- # add events to per-table caches
- tables = {}
- for ev in ev_list:
- tbl = ev.extra1
- if not tbl in tables:
- tables[tbl] = TableCache(tbl)
- cache = tables[tbl]
- cache.add_event(ev)
-
- # then process them
- for tbl, cache in tables.items():
- cache.finish()
- self.process_one_table(dst_db, tbl, cache)
-
- def process_one_table(self, dst_db, tbl, cache):
-
- del_list = cache.final_del_list
- ins_list = cache.final_ins_list
- upd_list = cache.final_upd_list
- col_list = cache.col_list
- real_update_count = len(upd_list)
-
- self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % (
- tbl, len(ins_list), len(upd_list), len(del_list)))
-
- if tbl in self.remap_tables:
- old = tbl
- tbl = self.remap_tables[tbl]
- self.log.debug("Redirect %s to %s" % (old, tbl))
-
- # hack to unbroke stuff
- if self.load_method == METH_MERGED:
- upd_list += ins_list
- ins_list = []
-
- # check if interesting table
- curs = dst_db.cursor()
- if not skytools.exists_table(curs, tbl):
- self.log.warning("Ignoring events for table: %s" % tbl)
- return
-
- # fetch distribution fields
- dist_fields = find_dist_fields(curs, tbl)
- extra_fields = []
- for fld in dist_fields:
- if fld not in cache.pkey_list:
- extra_fields.append(fld)
- self.log.debug("PKey fields: %s Extra fields: %s" % (
- ",".join(cache.pkey_list), ",".join(extra_fields)))
-
- # create temp table
- temp = self.create_temp_table(curs, tbl)
-
- # where expr must have pkey and dist fields
- klist = []
- for pk in cache.pkey_list + extra_fields:
- exp = "%s.%s = %s.%s" % (quote_fqident(tbl), quote_ident(pk),
- quote_fqident(temp), quote_ident(pk))
- klist.append(exp)
- whe_expr = " and ".join(klist)
-
- # create del sql
- del_sql = "delete from only %s using %s where %s" % (
- quote_fqident(tbl), quote_fqident(temp), whe_expr)
-
- # create update sql
- slist = []
- key_fields = cache.pkey_list + extra_fields
- for col in cache.col_list:
- if col not in key_fields:
- exp = "%s = %s.%s" % (quote_ident(col), quote_fqident(temp), quote_ident(col))
- slist.append(exp)
- upd_sql = "update only %s set %s from %s where %s" % (
- quote_fqident(tbl), ", ".join(slist), quote_fqident(temp), whe_expr)
-
- # insert sql
- colstr = ",".join([quote_ident(c) for c in cache.col_list])
- ins_sql = "insert into %s (%s) select %s from %s" % (
- quote_fqident(tbl), colstr, colstr, quote_fqident(temp))
-
- # process deleted rows
- if len(del_list) > 0:
- self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
- # delete old rows
- q = "truncate %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
- # copy rows
- self.log.debug("COPY %d rows into %s" % (len(del_list), temp))
- skytools.magic_insert(curs, temp, del_list, col_list)
- # delete rows
- self.log.debug(del_sql)
- curs.execute(del_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
- self.log.debug(curs.statusmessage)
- if len(del_list) != curs.rowcount:
- self.log.warning("Delete mismatch: expected=%s updated=%d"
- % (len(del_list), curs.rowcount))
-
- # process updated rows
- if len(upd_list) > 0:
- self.log.info("Updating %d rows in %s" % (len(upd_list), tbl))
- # delete old rows
- q = "truncate %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
- # copy rows
- self.log.debug("COPY %d rows into %s" % (len(upd_list), temp))
- skytools.magic_insert(curs, temp, upd_list, col_list)
- if self.load_method == METH_CORRECT:
- # update main table
- self.log.debug(upd_sql)
- curs.execute(upd_sql)
- self.log.debug(curs.statusmessage)
- # check count
- if len(upd_list) != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s updated=%d"
- % (len(upd_list), curs.rowcount))
- else:
- # delete from main table
- self.log.debug(del_sql)
- curs.execute(del_sql)
- self.log.debug(curs.statusmessage)
- # check count
- if real_update_count != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s deleted=%d"
- % (real_update_count, curs.rowcount))
- # insert into main table
- if AVOID_BIZGRES_BUG:
- # copy again, into main table
- self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl))
- skytools.magic_insert(curs, tbl, upd_list, col_list)
- else:
- # better way, but does not work due bizgres bug
- self.log.debug(ins_sql)
- curs.execute(ins_sql)
- self.log.debug(curs.statusmessage)
-
- # process new rows
- if len(ins_list) > 0:
- self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl))
- skytools.magic_insert(curs, tbl, ins_list, col_list)
-
- # delete remaining rows
- if USE_LONGLIVED_TEMP_TABLES:
- q = "truncate %s" % quote_fqident(temp)
- else:
- # fscking problems with long-lived temp tables
- q = "drop table %s" % quote_fqident(temp)
- self.log.debug(q)
- curs.execute(q)
-
- def create_temp_table(self, curs, tbl):
- # create temp table for loading
- tempname = tbl.replace('.', '_') + "_loadertmp"
-
- # check if exists
- if USE_LONGLIVED_TEMP_TABLES:
- if exists_temp_table(curs, tempname):
- self.log.debug("Using existing temp table %s" % tempname)
- return tempname
-
- # bizgres crashes on delete rows
- arg = "on commit delete rows"
- arg = "on commit preserve rows"
- # create temp table for loading
- q = "create temp table %s (like %s) %s" % (
- quote_fqident(tempname), quote_fqident(tbl), arg)
- self.log.debug("Creating temp table: %s" % q)
- curs.execute(q)
- return tempname
-
-if __name__ == '__main__':
- script = BulkLoader(sys.argv[1:])
- script.start()
-