diff options
author | martinko | 2013-02-28 14:14:43 +0000 |
---|---|---|
committer | martinko | 2013-02-28 14:14:43 +0000 |
commit | 7beec385710edb11751267cc71fa40547670ced8 (patch) | |
tree | 3c8e449e300567a355b660e18251582314b79794 /scripts/queue_loader.py | |
parent | abe43dce235b409f51a2d10474b1c6967a19b7c5 (diff) |
scripts: sweeping change to postpone log string formatting
Diffstat (limited to 'scripts/queue_loader.py')
-rwxr-xr-x | scripts/queue_loader.py | 64 |
1 files changed, 34 insertions, 30 deletions
diff --git a/scripts/queue_loader.py b/scripts/queue_loader.py index 97742b82..062970d1 100755 --- a/scripts/queue_loader.py +++ b/scripts/queue_loader.py @@ -9,9 +9,9 @@ Config template:: logfile = pidfile = - db = + db = - #rename_tables = + #rename_tables = [DEFAULT] @@ -51,7 +51,7 @@ Config template:: # alter table only %%(part)s add primary key (%%(pkey)s); # ### Inherited partitions - #split_part_template = + #split_part_template = # create table %%(part)s () inherits (%%(parent)s); # alter table only %%(part)s add primary key (%%(pkey)s); @@ -75,7 +75,7 @@ Config template:: #bulk_mode=correct [table public.foo] - mode = + mode = create_sql = """ @@ -88,7 +88,7 @@ import skytools from pgq.cascade.worker import CascadedWorker from skytools import quote_ident, quote_fqident, UsageError -# todo: auto table detect +# TODO: auto table detect # BulkLoader load method METH_CORRECT = 0 @@ -99,6 +99,7 @@ LOAD_METHOD = METH_CORRECT AVOID_BIZGRES_BUG = 0 USE_LONGLIVED_TEMP_TABLES = True + class BasicLoader: """Apply events as-is.""" def __init__(self, table_name, parent_name, log): @@ -123,6 +124,7 @@ class BasicLoader: curs.execute("\n".join(self.sql_list)) self.sql_list = [] + class KeepLatestLoader(BasicLoader): """Keep latest row version. @@ -161,6 +163,7 @@ class BulkEvent(object): self.data = data self.pk_data = pk_data + class BulkLoader(BasicLoader): """Instead of statement-per event, load all data with one big COPY, UPDATE or DELETE statement. @@ -246,7 +249,7 @@ class BulkLoader(BasicLoader): # take last event ev = ev_list[-1] - + # generate needed commands if exists_before and exists_after: upd_list.append(ev.data) @@ -268,8 +271,8 @@ class BulkLoader(BasicLoader): real_update_count = len(upd_list) - #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % ( - # tbl, len(ins_list), len(upd_list), len(del_list))) + #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)", + # tbl, len(ins_list), len(upd_list), len(del_list)) # hack to unbroke stuff if LOAD_METHOD == METH_MERGED: @@ -284,13 +287,13 @@ class BulkLoader(BasicLoader): for fld in self.dist_fields: if fld not in key_fields: key_fields.append(fld) - #self.log.debug("PKey fields: %s Extra fields: %s" % ( - # ",".join(cache.pkey_list), ",".join(extra_fields))) + #self.log.debug("PKey fields: %s Extra fields: %s", + # ",".join(cache.pkey_list), ",".join(extra_fields)) # create temp table temp = self.create_temp_table(curs) tbl = self.table_name - + # where expr must have pkey and dist fields klist = [] for pk in key_fields: @@ -321,43 +324,43 @@ class BulkLoader(BasicLoader): # process deleted rows if len(del_list) > 0: - #self.log.info("Deleting %d rows from %s" % (len(del_list), tbl)) + #self.log.info("Deleting %d rows from %s", len(del_list), tbl) # delete old rows q = "truncate %s" % quote_fqident(temp) self.log.debug(q) curs.execute(q) # copy rows - self.log.debug("COPY %d rows into %s" % (len(del_list), temp)) + self.log.debug("COPY %d rows into %s", len(del_list), temp) skytools.magic_insert(curs, temp, del_list, col_list) # delete rows self.log.debug(del_sql) curs.execute(del_sql) - self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount)) + self.log.debug("%s - %d", curs.statusmessage, curs.rowcount) if len(del_list) != curs.rowcount: - self.log.warning("Delete mismatch: expected=%s deleted=%d" - % (len(del_list), curs.rowcount)) + self.log.warning("Delete mismatch: expected=%d deleted=%d", + len(del_list), curs.rowcount) temp_used = True # process updated rows if len(upd_list) > 0: - #self.log.info("Updating %d rows in %s" % (len(upd_list), tbl)) + #self.log.info("Updating %d rows in %s", len(upd_list), tbl) # delete old rows q = "truncate %s" % quote_fqident(temp) self.log.debug(q) curs.execute(q) # copy rows - self.log.debug("COPY %d rows into %s" % (len(upd_list), temp)) + self.log.debug("COPY %d rows into %s", len(upd_list), temp) skytools.magic_insert(curs, temp, upd_list, col_list) temp_used = True if LOAD_METHOD == METH_CORRECT: # update main table self.log.debug(upd_sql) curs.execute(upd_sql) - self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount)) + self.log.debug("%s - %d", curs.statusmessage, curs.rowcount) # check count if len(upd_list) != curs.rowcount: - self.log.warning("Update mismatch: expected=%s updated=%d" - % (len(upd_list), curs.rowcount)) + self.log.warning("Update mismatch: expected=%d updated=%d", + len(upd_list), curs.rowcount) else: # delete from main table self.log.debug(del_sql) @@ -365,12 +368,12 @@ class BulkLoader(BasicLoader): self.log.debug(curs.statusmessage) # check count if real_update_count != curs.rowcount: - self.log.warning("Update mismatch: expected=%s deleted=%d" - % (real_update_count, curs.rowcount)) + self.log.warning("Update mismatch: expected=%d deleted=%d", + real_update_count, curs.rowcount) # insert into main table if AVOID_BIZGRES_BUG: # copy again, into main table - self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl)) + self.log.debug("COPY %d rows into %s", len(upd_list), tbl) skytools.magic_insert(curs, tbl, upd_list, col_list) else: # better way, but does not work due bizgres bug @@ -380,7 +383,7 @@ class BulkLoader(BasicLoader): # process new rows if len(ins_list) > 0: - self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl)) + self.log.info("Inserting %d rows into %s", len(ins_list), tbl) skytools.magic_insert(curs, tbl, ins_list, col_list) # delete remaining rows @@ -402,16 +405,16 @@ class BulkLoader(BasicLoader): # check if exists if USE_LONGLIVED_TEMP_TABLES: if skytools.exists_temp_table(curs, tempname): - self.log.debug("Using existing temp table %s" % tempname) + self.log.debug("Using existing temp table %s", tempname) return tempname - + # bizgres crashes on delete rows arg = "on commit delete rows" arg = "on commit preserve rows" # create temp table for loading q = "create temp table %s (like %s) %s" % ( quote_fqident(tempname), quote_fqident(self.table_name), arg) - self.log.debug("Creating temp table: %s" % q) + self.log.debug("Creating temp table: %s", q) curs.execute(q) return tempname @@ -464,8 +467,8 @@ class TableHandler: self.split_format = self.split_date_from_field else: raise UsageError('Bad value for split_mode: '+smode) - self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s" % ( - self.table_name, smode, self.split_field, self.split_part)) + self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s", + self.table_name, smode, self.split_field, self.split_part) elif table_mode == 'ignore': pass else: @@ -603,6 +606,7 @@ class QueueLoader(CascadedWorker): st.flush(curs) CascadedWorker.finish_remote_batch(self, src_db, dst_db, tick_id) + if __name__ == '__main__': script = QueueLoader('queue_loader', 'db', sys.argv[1:]) script.start() |