summaryrefslogtreecommitdiff
path: root/scripts/queue_loader.py
diff options
context:
space:
mode:
authormartinko2013-02-28 14:14:43 +0000
committermartinko2013-02-28 14:14:43 +0000
commit7beec385710edb11751267cc71fa40547670ced8 (patch)
tree3c8e449e300567a355b660e18251582314b79794 /scripts/queue_loader.py
parentabe43dce235b409f51a2d10474b1c6967a19b7c5 (diff)
scripts: sweeping change to postpone log string formatting
Diffstat (limited to 'scripts/queue_loader.py')
-rwxr-xr-xscripts/queue_loader.py64
1 files changed, 34 insertions, 30 deletions
diff --git a/scripts/queue_loader.py b/scripts/queue_loader.py
index 97742b82..062970d1 100755
--- a/scripts/queue_loader.py
+++ b/scripts/queue_loader.py
@@ -9,9 +9,9 @@ Config template::
logfile =
pidfile =
- db =
+ db =
- #rename_tables =
+ #rename_tables =
[DEFAULT]
@@ -51,7 +51,7 @@ Config template::
# alter table only %%(part)s add primary key (%%(pkey)s);
#
### Inherited partitions
- #split_part_template =
+ #split_part_template =
# create table %%(part)s () inherits (%%(parent)s);
# alter table only %%(part)s add primary key (%%(pkey)s);
@@ -75,7 +75,7 @@ Config template::
#bulk_mode=correct
[table public.foo]
- mode =
+ mode =
create_sql =
"""
@@ -88,7 +88,7 @@ import skytools
from pgq.cascade.worker import CascadedWorker
from skytools import quote_ident, quote_fqident, UsageError
-# todo: auto table detect
+# TODO: auto table detect
# BulkLoader load method
METH_CORRECT = 0
@@ -99,6 +99,7 @@ LOAD_METHOD = METH_CORRECT
AVOID_BIZGRES_BUG = 0
USE_LONGLIVED_TEMP_TABLES = True
+
class BasicLoader:
"""Apply events as-is."""
def __init__(self, table_name, parent_name, log):
@@ -123,6 +124,7 @@ class BasicLoader:
curs.execute("\n".join(self.sql_list))
self.sql_list = []
+
class KeepLatestLoader(BasicLoader):
"""Keep latest row version.
@@ -161,6 +163,7 @@ class BulkEvent(object):
self.data = data
self.pk_data = pk_data
+
class BulkLoader(BasicLoader):
"""Instead of statement-per event, load all data with one
big COPY, UPDATE or DELETE statement.
@@ -246,7 +249,7 @@ class BulkLoader(BasicLoader):
# take last event
ev = ev_list[-1]
-
+
# generate needed commands
if exists_before and exists_after:
upd_list.append(ev.data)
@@ -268,8 +271,8 @@ class BulkLoader(BasicLoader):
real_update_count = len(upd_list)
- #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % (
- # tbl, len(ins_list), len(upd_list), len(del_list)))
+ #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)",
+ # tbl, len(ins_list), len(upd_list), len(del_list))
# hack to unbroke stuff
if LOAD_METHOD == METH_MERGED:
@@ -284,13 +287,13 @@ class BulkLoader(BasicLoader):
for fld in self.dist_fields:
if fld not in key_fields:
key_fields.append(fld)
- #self.log.debug("PKey fields: %s Extra fields: %s" % (
- # ",".join(cache.pkey_list), ",".join(extra_fields)))
+ #self.log.debug("PKey fields: %s Extra fields: %s",
+ # ",".join(cache.pkey_list), ",".join(extra_fields))
# create temp table
temp = self.create_temp_table(curs)
tbl = self.table_name
-
+
# where expr must have pkey and dist fields
klist = []
for pk in key_fields:
@@ -321,43 +324,43 @@ class BulkLoader(BasicLoader):
# process deleted rows
if len(del_list) > 0:
- #self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
+ #self.log.info("Deleting %d rows from %s", len(del_list), tbl)
# delete old rows
q = "truncate %s" % quote_fqident(temp)
self.log.debug(q)
curs.execute(q)
# copy rows
- self.log.debug("COPY %d rows into %s" % (len(del_list), temp))
+ self.log.debug("COPY %d rows into %s", len(del_list), temp)
skytools.magic_insert(curs, temp, del_list, col_list)
# delete rows
self.log.debug(del_sql)
curs.execute(del_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("%s - %d", curs.statusmessage, curs.rowcount)
if len(del_list) != curs.rowcount:
- self.log.warning("Delete mismatch: expected=%s deleted=%d"
- % (len(del_list), curs.rowcount))
+ self.log.warning("Delete mismatch: expected=%d deleted=%d",
+ len(del_list), curs.rowcount)
temp_used = True
# process updated rows
if len(upd_list) > 0:
- #self.log.info("Updating %d rows in %s" % (len(upd_list), tbl))
+ #self.log.info("Updating %d rows in %s", len(upd_list), tbl)
# delete old rows
q = "truncate %s" % quote_fqident(temp)
self.log.debug(q)
curs.execute(q)
# copy rows
- self.log.debug("COPY %d rows into %s" % (len(upd_list), temp))
+ self.log.debug("COPY %d rows into %s", len(upd_list), temp)
skytools.magic_insert(curs, temp, upd_list, col_list)
temp_used = True
if LOAD_METHOD == METH_CORRECT:
# update main table
self.log.debug(upd_sql)
curs.execute(upd_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("%s - %d", curs.statusmessage, curs.rowcount)
# check count
if len(upd_list) != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s updated=%d"
- % (len(upd_list), curs.rowcount))
+ self.log.warning("Update mismatch: expected=%d updated=%d",
+ len(upd_list), curs.rowcount)
else:
# delete from main table
self.log.debug(del_sql)
@@ -365,12 +368,12 @@ class BulkLoader(BasicLoader):
self.log.debug(curs.statusmessage)
# check count
if real_update_count != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s deleted=%d"
- % (real_update_count, curs.rowcount))
+ self.log.warning("Update mismatch: expected=%d deleted=%d",
+ real_update_count, curs.rowcount)
# insert into main table
if AVOID_BIZGRES_BUG:
# copy again, into main table
- self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl))
+ self.log.debug("COPY %d rows into %s", len(upd_list), tbl)
skytools.magic_insert(curs, tbl, upd_list, col_list)
else:
# better way, but does not work due bizgres bug
@@ -380,7 +383,7 @@ class BulkLoader(BasicLoader):
# process new rows
if len(ins_list) > 0:
- self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl))
+ self.log.info("Inserting %d rows into %s", len(ins_list), tbl)
skytools.magic_insert(curs, tbl, ins_list, col_list)
# delete remaining rows
@@ -402,16 +405,16 @@ class BulkLoader(BasicLoader):
# check if exists
if USE_LONGLIVED_TEMP_TABLES:
if skytools.exists_temp_table(curs, tempname):
- self.log.debug("Using existing temp table %s" % tempname)
+ self.log.debug("Using existing temp table %s", tempname)
return tempname
-
+
# bizgres crashes on delete rows
arg = "on commit delete rows"
arg = "on commit preserve rows"
# create temp table for loading
q = "create temp table %s (like %s) %s" % (
quote_fqident(tempname), quote_fqident(self.table_name), arg)
- self.log.debug("Creating temp table: %s" % q)
+ self.log.debug("Creating temp table: %s", q)
curs.execute(q)
return tempname
@@ -464,8 +467,8 @@ class TableHandler:
self.split_format = self.split_date_from_field
else:
raise UsageError('Bad value for split_mode: '+smode)
- self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s" % (
- self.table_name, smode, self.split_field, self.split_part))
+ self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s",
+ self.table_name, smode, self.split_field, self.split_part)
elif table_mode == 'ignore':
pass
else:
@@ -603,6 +606,7 @@ class QueueLoader(CascadedWorker):
st.flush(curs)
CascadedWorker.finish_remote_batch(self, src_db, dst_db, tick_id)
+
if __name__ == '__main__':
script = QueueLoader('queue_loader', 'db', sys.argv[1:])
script.start()