diff options
| author | martinko | 2013-02-21 15:09:47 +0000 |
|---|---|---|
| committer | martinko | 2013-02-21 15:09:47 +0000 |
| commit | 5ead1f8ac981527d416718bfe4822962ef9b3252 (patch) | |
| tree | ac108aec1196021e4743530a946ee18fdf775665 /python/londiste | |
| parent | c147690b1f4f1b903185d7a949f01408694fb081 (diff) | |
londiste: sweeping change to postpone log string formatting
Diffstat (limited to 'python/londiste')
| -rw-r--r-- | python/londiste/compare.py | 12 | ||||
| -rw-r--r-- | python/londiste/handlers/applyfn.py | 2 | ||||
| -rw-r--r-- | python/londiste/handlers/bulk.py | 68 | ||||
| -rw-r--r-- | python/londiste/handlers/dispatch.py | 51 | ||||
| -rw-r--r-- | python/londiste/handlers/part.py | 6 | ||||
| -rw-r--r-- | python/londiste/playback.py | 38 | ||||
| -rw-r--r-- | python/londiste/repair.py | 31 | ||||
| -rw-r--r-- | python/londiste/setup.py | 26 | ||||
| -rw-r--r-- | python/londiste/syncer.py | 19 | ||||
| -rw-r--r-- | python/londiste/table_copy.py | 25 |
10 files changed, 135 insertions, 143 deletions
diff --git a/python/londiste/compare.py b/python/londiste/compare.py index 83dac2e4..3f74862f 100644 --- a/python/londiste/compare.py +++ b/python/londiste/compare.py @@ -27,7 +27,7 @@ class Comparator(Syncer): dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs) src_where = dst_where - self.log.info('Counting %s' % dst_tbl) + self.log.info('Counting %s', dst_tbl) # get common cols cols = self.calc_cols(src_curs, src_tbl, dst_curs, dst_tbl) @@ -61,22 +61,22 @@ class Comparator(Syncer): f += ", checksum=%(chksum)s" f = self.cf.get('compare_fmt', f) - self.log.debug("srcdb: " + src_q) + self.log.debug("srcdb: %s", src_q) src_curs.execute(src_q) src_row = src_curs.fetchone() src_str = f % src_row - self.log.info("srcdb: %s" % src_str) + self.log.info("srcdb: %s", src_str) src_db.commit() - self.log.debug("dstdb: " + dst_q) + self.log.debug("dstdb: %s", dst_q) dst_curs.execute(dst_q) dst_row = dst_curs.fetchone() dst_str = f % dst_row - self.log.info("dstdb: %s" % dst_str) + self.log.info("dstdb: %s", dst_str) dst_db.commit() if src_str != dst_str: - self.log.warning("%s: Results do not match!" % dst_tbl) + self.log.warning("%s: Results do not match!", dst_tbl) return 1 return 0 diff --git a/python/londiste/handlers/applyfn.py b/python/londiste/handlers/applyfn.py index b7b1173c..cbbf603b 100644 --- a/python/londiste/handlers/applyfn.py +++ b/python/londiste/handlers/applyfn.py @@ -34,7 +34,7 @@ class ApplyFuncHandler(BaseHandler): qfn = skytools.quote_fqident(fn) qargs = [skytools.quote_literal(a) for a in args] sql = "select %s(%s);" % (qfn, ', '.join(qargs)) - self.log.debug('applyfn.sql: %s' % sql) + self.log.debug('applyfn.sql: %s', sql) sql_queue_func(sql, qfunc_arg) #------------------------------------------------------------------------------ diff --git a/python/londiste/handlers/bulk.py b/python/londiste/handlers/bulk.py index 0c0167ac..e8b9104b 100644 --- a/python/londiste/handlers/bulk.py +++ b/python/londiste/handlers/bulk.py @@ -82,7 +82,7 @@ class BulkLoader(BaseHandler): if not self.method in (0,1,2): raise Exception('unknown method: %s' % self.method) - self.log.debug('bulk_init(%s), method=%d' % (repr(args), self.method)) + self.log.debug('bulk_init(%r), method=%d', args, self.method) def reset(self): self.pkey_ev_map = {} @@ -98,7 +98,7 @@ class BulkLoader(BaseHandler): op = ev.ev_type[0] if op not in 'IUD': raise Exception('Unknown event type: '+ev.ev_type) - self.log.debug('bulk.process_event: %s/%s' % (ev.ev_type, ev.ev_data)) + self.log.debug('bulk.process_event: %s/%s', ev.ev_type, ev.ev_data) # pkey_list = ev.ev_type[2:].split(',') data = skytools.db_urldecode(ev.ev_data) @@ -184,8 +184,8 @@ class BulkLoader(BaseHandler): real_update_count = len(upd_list) - self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % ( - self.table_name, len(ins_list), len(upd_list), len(del_list))) + self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)", + self.table_name, len(ins_list), len(upd_list), len(del_list)) # hack to unbroke stuff if self.method == METH_MERGED: @@ -200,8 +200,8 @@ class BulkLoader(BaseHandler): for fld in self.dist_fields: if fld not in key_fields: key_fields.append(fld) - self.log.debug("PKey fields: %s Dist fields: %s" % ( - ",".join(self.pkey_list), ",".join(self.dist_fields))) + self.log.debug("PKey fields: %s Dist fields: %s", + ",".join(self.pkey_list), ",".join(self.dist_fields)) # create temp table temp, qtemp = self.create_temp_table(curs) @@ -241,67 +241,67 @@ class BulkLoader(BaseHandler): # process deleted rows if len(del_list) > 0: - self.log.debug("bulk: Deleting %d rows from %s" % (len(del_list), tbl)) + self.log.debug("bulk: Deleting %d rows from %s", len(del_list), tbl) # delete old rows q = "truncate %s" % qtemp - self.log.debug('bulk: %s' % q) + self.log.debug('bulk: %s', q) curs.execute(q) # copy rows - self.log.debug("bulk: COPY %d rows into %s" % (len(del_list), temp)) + self.log.debug("bulk: COPY %d rows into %s", len(del_list), temp) skytools.magic_insert(curs, qtemp, del_list, col_list, quoted_table=1) # delete rows - self.log.debug('bulk: ' + del_sql) + self.log.debug('bulk: %s', del_sql) curs.execute(del_sql) - self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount)) + self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount) if len(del_list) != curs.rowcount: - self.log.warning("Delete mismatch: expected=%s deleted=%d" - % (len(del_list), curs.rowcount)) + self.log.warning("Delete mismatch: expected=%s deleted=%d", + len(del_list), curs.rowcount) temp_used = True # process updated rows if len(upd_list) > 0: - self.log.debug("bulk: Updating %d rows in %s" % (len(upd_list), tbl)) + self.log.debug("bulk: Updating %d rows in %s", len(upd_list), tbl) # delete old rows q = "truncate %s" % qtemp - self.log.debug('bulk: ' + q) + self.log.debug('bulk: %s', q) curs.execute(q) # copy rows - self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), temp)) + self.log.debug("bulk: COPY %d rows into %s", len(upd_list), temp) skytools.magic_insert(curs, qtemp, upd_list, col_list, quoted_table=1) temp_used = True if self.method == METH_CORRECT: # update main table - self.log.debug('bulk: ' + upd_sql) + self.log.debug('bulk: %s', upd_sql) curs.execute(upd_sql) - self.log.debug("bulk: %s - %d" % (curs.statusmessage, curs.rowcount)) + self.log.debug("bulk: %s - %d", curs.statusmessage, curs.rowcount) # check count if len(upd_list) != curs.rowcount: - self.log.warning("Update mismatch: expected=%s updated=%d" - % (len(upd_list), curs.rowcount)) + self.log.warning("Update mismatch: expected=%s updated=%d", + len(upd_list), curs.rowcount) else: # delete from main table - self.log.debug('bulk: ' + del_sql) + self.log.debug('bulk: %s', del_sql) curs.execute(del_sql) - self.log.debug('bulk: ' + curs.statusmessage) + self.log.debug('bulk: %s', curs.statusmessage) # check count if real_update_count != curs.rowcount: - self.log.warning("bulk: Update mismatch: expected=%s deleted=%d" - % (real_update_count, curs.rowcount)) + self.log.warning("bulk: Update mismatch: expected=%s deleted=%d", + real_update_count, curs.rowcount) # insert into main table if AVOID_BIZGRES_BUG: # copy again, into main table - self.log.debug("bulk: COPY %d rows into %s" % (len(upd_list), tbl)) + self.log.debug("bulk: COPY %d rows into %s", len(upd_list), tbl) skytools.magic_insert(curs, qtbl, upd_list, col_list, quoted_table=1) else: # better way, but does not work due bizgres bug - self.log.debug('bulk: ' + ins_sql) + self.log.debug('bulk: %s', ins_sql) curs.execute(ins_sql) - self.log.debug('bulk: ' + curs.statusmessage) + self.log.debug('bulk: %s', curs.statusmessage) # process new rows if len(ins_list) > 0: - self.log.debug("bulk: Inserting %d rows into %s" % (len(ins_list), tbl)) - self.log.debug("bulk: COPY %d rows into %s" % (len(ins_list), tbl)) + self.log.debug("bulk: Inserting %d rows into %s", len(ins_list), tbl) + self.log.debug("bulk: COPY %d rows into %s", len(ins_list), tbl) skytools.magic_insert(curs, qtbl, ins_list, col_list, quoted_table=1) # delete remaining rows @@ -311,7 +311,7 @@ class BulkLoader(BaseHandler): else: # fscking problems with long-lived temp tables q = "drop table %s" % qtemp - self.log.debug('bulk: ' + q) + self.log.debug('bulk: %s', q) curs.execute(q) self.reset() @@ -326,19 +326,19 @@ class BulkLoader(BaseHandler): # check if exists if USE_REAL_TABLE: if skytools.exists_table(curs, tempname): - self.log.debug("bulk: Using existing real table %s" % tempname) + self.log.debug("bulk: Using existing real table %s", tempname) return tempname, quote_fqident(tempname) # create non-temp table q = "create table %s (like %s)" % ( quote_fqident(tempname), quote_fqident(self.dest_table)) - self.log.debug("bulk: Creating real table: %s" % q) + self.log.debug("bulk: Creating real table: %s", q) curs.execute(q) return tempname, quote_fqident(tempname) elif USE_LONGLIVED_TEMP_TABLES: if skytools.exists_temp_table(curs, tempname): - self.log.debug("bulk: Using existing temp table %s" % tempname) + self.log.debug("bulk: Using existing temp table %s", tempname) return tempname, quote_ident(tempname) # bizgres crashes on delete rows @@ -347,7 +347,7 @@ class BulkLoader(BaseHandler): # create temp table for loading q = "create temp table %s (like %s) %s" % ( quote_ident(tempname), quote_fqident(self.dest_table), arg) - self.log.debug("bulk: Creating temp table: %s" % q) + self.log.debug("bulk: Creating temp table: %s", q) curs.execute(q) return tempname, quote_ident(tempname) diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 1a13c90a..758034c7 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -316,10 +316,9 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): def logexec(self, curs, sql): """Logs and executes sql statement""" - self.log.debug('exec: %s' % sql) + self.log.debug('exec: %s', sql) curs.execute(sql) - self.log.debug('msg: %s, rows: %s' % ( - curs.statusmessage, curs.rowcount)) + self.log.debug('msg: %s, rows: %s', curs.statusmessage, curs.rowcount) # create sql parts @@ -403,15 +402,15 @@ class BulkLoader(BaseBulkTempLoader): cnt = len(data) if (cnt == 0): return - self.log.debug("bulk: Deleting %d rows from %s" % (cnt, self.table)) + self.log.debug("bulk: Deleting %d rows from %s", cnt, self.table) # copy rows to temp self.bulk_insert(curs, data) # delete rows using temp self.delete(curs) # check if right amount of rows deleted (only in direct mode) if self.conf.table_mode == 'direct' and cnt != curs.rowcount: - self.log.warning("%s: Delete mismatch: expected=%s deleted=%d" - % (self.table, cnt, curs.rowcount)) + self.log.warning("%s: Delete mismatch: expected=%s deleted=%d", + self.table, cnt, curs.rowcount) def process_update(self, curs, op_map): """Process update list""" @@ -424,7 +423,7 @@ class BulkLoader(BaseBulkTempLoader): cnt = len(data) if (cnt == 0): return - self.log.debug("bulk: Updating %d rows in %s" % (cnt, self.table)) + self.log.debug("bulk: Updating %d rows in %s", cnt, self.table) # copy rows to temp self.bulk_insert(curs, data) if self.method == METH_CORRECT: @@ -432,15 +431,15 @@ class BulkLoader(BaseBulkTempLoader): self.update(curs) # check count (only in direct mode) if self.conf.table_mode == 'direct' and cnt != curs.rowcount: - self.log.warning("%s: Update mismatch: expected=%s updated=%d" - % (self.table, cnt, curs.rowcount)) + self.log.warning("%s: Update mismatch: expected=%s updated=%d", + self.table, cnt, curs.rowcount) else: # delete from main table using temp self.delete(curs) # check count (only in direct mode) if self.conf.table_mode == 'direct' and real_cnt != curs.rowcount: - self.log.warning("%s: Update mismatch: expected=%s deleted=%d" - % (self.table, real_cnt, curs.rowcount)) + self.log.warning("%s: Update mismatch: expected=%s deleted=%d", + self.table, real_cnt, curs.rowcount) # insert into main table if AVOID_BIZGRES_BUG: # copy again, into main table @@ -457,19 +456,19 @@ class BulkLoader(BaseBulkTempLoader): # merged method loads inserts together with updates if (cnt == 0) or (self.method == METH_MERGED): return - self.log.debug("bulk: Inserting %d rows into %s" % (cnt, self.table)) + self.log.debug("bulk: Inserting %d rows into %s", cnt, self.table) # copy into target table (no temp used) self.bulk_insert(curs, data, table = self.qtable) def bulk_flush(self, curs, op_map): - self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % ( - self.table, len(op_map['I']), len(op_map['U']), len(op_map['D']))) + self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)", self.table, + len(op_map['I']), len(op_map['U']), len(op_map['D'])) # fetch distribution fields if self.dist_fields is None: self.dist_fields = self.find_dist_fields(curs) - self.log.debug("Key fields: %s Dist fields: %s" % ( - ",".join(self.pkeys), ",".join(self.dist_fields))) + self.log.debug("Key fields: %s Dist fields: %s", + ",".join(self.pkeys), ",".join(self.dist_fields)) # add them to key for key in self.dist_fields: if key not in self.keys: @@ -505,7 +504,7 @@ class BulkLoader(BaseBulkTempLoader): """ if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE: if self.temp_present: - self.log.debug("bulk: Using existing temp table %s" % self.temp) + self.log.debug("bulk: Using existing temp table %s", self.temp) return False self.create(curs) self.temp_present = True @@ -525,7 +524,7 @@ class BulkLoader(BaseBulkTempLoader): # truncate when re-using existing table if not self.create_temp(curs): self.truncate(curs) - self.log.debug("bulk: COPY %d rows into %s" % (len(data), table)) + self.log.debug("bulk: COPY %d rows into %s", len(data), table) skytools.magic_insert(curs, table, data, self.fields, quoted_table = True) if _use_temp and self.run_analyze: @@ -634,8 +633,7 @@ class Dispatcher(BaseHandler): BaseHandler.__init__(self, table_name, args, dest_table) # show args - self.log.debug("dispatch.init: table_name=%r, args=%r" % \ - (table_name, args)) + self.log.debug("dispatch.init: table_name=%r, args=%r", table_name, args) self.batch_info = None self.dst_curs = None self.pkeys = None @@ -784,8 +782,7 @@ class Dispatcher(BaseHandler): # process only operations specified if not op in self.conf.event_types: return - self.log.debug('dispatch.process_event: %s/%s' % ( - ev.ev_type, ev.ev_data)) + self.log.debug('dispatch.process_event: %s/%s', ev.ev_type, ev.ev_data) if self.pkeys is None: self.pkeys = self.filter_pkeys(pkeys.split(',')) data = self.filter_data(data) @@ -886,7 +883,7 @@ class Dispatcher(BaseHandler): have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS)) if have_func: - self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals)) + self.log.debug('check_part.exec: func: %s, args: %s', pfcall, vals) curs.execute(pfcall, vals) else: # @@ -896,12 +893,12 @@ class Dispatcher(BaseHandler): # - check constraints # - inheritance # - self.log.debug('part func %s not found, cloning table' % self.conf.part_func) + self.log.debug('part func %s not found, cloning table', self.conf.part_func) struct = TableStruct(curs, self.dest_table) struct.create(curs, T_ALL, dst) exec_with_vals(self.conf.post_part) - self.log.info("Created table: %s" % dst) + self.log.info("Created table: %s", dst) if self.conf.retention_period: self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period) @@ -913,8 +910,8 @@ class Dispatcher(BaseHandler): func = RETENTION_FUNC args = [parent_table, retention_period, partition_period] sql = "select " + func + " (%s, %s, %s)" - self.log.debug("func: %s, args: %s" % (func, args)) - curs.execute (sql, args) + self.log.debug("func: %s, args: %s", func, args) + curs.execute(sql, args) res = [] for row in curs.fetchall(): res.append(row[0]) diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py index bdabb3e6..247256e4 100644 --- a/python/londiste/handlers/part.py +++ b/python/londiste/handlers/part.py @@ -72,8 +72,8 @@ class PartHandler(TableHandler): """Filter event by hash in extra3, apply only local part.""" if ev.extra3: meta = skytools.db_urldecode(ev.extra3) - self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d' %\ - (int(meta['hash']), self.max_part, self.local_part)) + self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d', + int(meta['hash']), self.max_part, self.local_part) if (int(meta['hash']) & self.max_part) != self.local_part: self.log.debug('part.process_event: not my event') return @@ -84,7 +84,7 @@ class PartHandler(TableHandler): """Prepare the where condition for copy and replay filtering""" self.load_part_info(dst_curs) w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part) - self.log.debug('part: copy_condition=%s' % w) + self.log.debug('part: copy_condition=%s', w) return w def load_part_info(self, curs): diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 4129cbe1..4f509dcf 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -103,7 +103,7 @@ class TableState(object): """Set snapshot.""" if self.str_snapshot == str_snapshot: return - self.log.debug("%s: change_snapshot to %s" % (self.name, str_snapshot)) + self.log.debug("%s: change_snapshot to %s", self.name, str_snapshot) self.str_snapshot = str_snapshot if str_snapshot: self.from_snapshot = skytools.Snapshot(str_snapshot) @@ -122,8 +122,7 @@ class TableState(object): self.state = state self.sync_tick_id = tick_id self.changed = 1 - self.log.debug("%s: change_state to %s" % (self.name, - self.render_state())) + self.log.debug("%s: change_state to %s", self.name, self.render_state()) def render_state(self): """Make a string to be stored in db.""" @@ -172,8 +171,8 @@ class TableState(object): def loaded_state(self, row): """Update object with info from db.""" - self.log.debug("loaded_state: %s: %s / %s" % ( - self.name, row['merge_state'], row['custom_snapshot'])) + self.log.debug("loaded_state: %s: %s / %s", + self.name, row['merge_state'], row['custom_snapshot']) self.change_snapshot(row['custom_snapshot'], 0) self.state = self.parse_state(row['merge_state']) self.changed = 0 @@ -290,7 +289,7 @@ class Replicator(CascadedWorker): # compare: sql to use #compare_sql = select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t # workaround for hashtext change between 8.3 and 8.4 - #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t + #compare_sql = select count(1) as cnt, sum(('x'||substr(md5(t.*::text),1,16))::bit(64)::bigint) as chksum from only _TABLE_ t #compare_fmt = %(cnt)d rows, checksum=%(chksum)s """ @@ -485,11 +484,11 @@ class Replicator(CascadedWorker): else: # regular provider is used if t.name not in pmap: - self.log.warning("Table %s not available on provider" % t.name) + self.log.warning("Table %s not available on provider", t.name) continue pt = pmap[t.name] if pt.state != TABLE_OK: # or pt.custom_snapshot: # FIXME: does snapsnot matter? - self.log.info("Table %s not OK on provider, waiting" % t.name) + self.log.info("Table %s not OK on provider, waiting", t.name) continue # dont allow more copies than configured @@ -519,7 +518,7 @@ class Replicator(CascadedWorker): # somebody may have done remove-table in the meantime if self.copy_table_name not in self.table_map: - self.log.error("copy_sync: lost table: %s" % self.copy_table_name) + self.log.error("copy_sync: lost table: %s", self.copy_table_name) return SYNC_EXIT # This operates on single table @@ -537,8 +536,8 @@ class Replicator(CascadedWorker): elif self.cur_tick < t.sync_tick_id: return SYNC_OK else: - self.log.error("copy_sync: cur_tick=%d sync_tick=%d" % ( - self.cur_tick, t.sync_tick_id)) + self.log.error("copy_sync: cur_tick=%d sync_tick=%d", + self.cur_tick, t.sync_tick_id) raise Exception('Invalid table state') elif t.state == TABLE_WANNA_SYNC: # wait for main thread to react @@ -597,7 +596,7 @@ class Replicator(CascadedWorker): def process_remote_event(self, src_curs, dst_curs, ev): """handle one event""" - self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s" % (ev.id, ev.type, ev.data, ev.extra1)) + self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s", ev.id, ev.type, ev.data, ev.extra1) # set current_event only if processing them one-by-one if self.work_state < 0: @@ -824,8 +823,8 @@ class Replicator(CascadedWorker): if not t.changed: continue merge_state = t.render_state() - self.log.info("storing state of %s: copy:%d new_state:%s" % ( - t.name, self.copy_thread, merge_state)) + self.log.info("storing state of %s: copy:%d new_state:%s", + t.name, self.copy_thread, merge_state) q = "select londiste.local_set_table_state(%s, %s, %s, %s)" curs.execute(q, [self.set_name, t.name, t.str_snapshot, merge_state]) @@ -838,8 +837,8 @@ class Replicator(CascadedWorker): self.save_table_state(dst_db.cursor()) dst_db.commit() - self.log.info("Table %s status changed to '%s'" % ( - tbl.name, tbl.render_state())) + self.log.info("Table %s status changed to '%s'", + tbl.name, tbl.render_state()) def get_tables_in_state(self, state): "get all tables with specific state" @@ -878,11 +877,11 @@ class Replicator(CascadedWorker): time.sleep(2) # launch and wait for daemonization result - self.log.debug("Launch args: "+repr(cmd)) + self.log.debug("Launch args: %r", cmd) res = os.spawnvp(os.P_WAIT, script, cmd) - self.log.debug("Launch result: "+repr(res)) + self.log.debug("Launch result: %r", res) if res != 0: - self.log.error("Failed to launch copy process, result=%d" % res) + self.log.error("Failed to launch copy process, result=%d", res) def sync_database_encodings(self, src_db, dst_db): """Make sure client_encoding is same on both side.""" @@ -979,4 +978,3 @@ class Replicator(CascadedWorker): if __name__ == '__main__': script = Replicator(sys.argv[1:]) script.start() - diff --git a/python/londiste/repair.py b/python/londiste/repair.py index d33e6d62..46ad067b 100644 --- a/python/londiste/repair.py +++ b/python/londiste/repair.py @@ -49,7 +49,7 @@ class Repairer(Syncer): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - self.log.info('Checking %s' % dst_tbl) + self.log.info('Checking %s', dst_tbl) self.common_fields = [] self.fq_common_fields = [] @@ -62,16 +62,16 @@ class Repairer(Syncer): dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs) src_where = dst_where - self.log.info("Dumping src table: %s" % src_tbl) + self.log.info("Dumping src table: %s", src_tbl) self.dump_table(src_tbl, src_curs, dump_src, src_where) src_db.commit() - self.log.info("Dumping dst table: %s" % dst_tbl) + self.log.info("Dumping dst table: %s", dst_tbl) self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where) dst_db.commit() - - self.log.info("Sorting src table: %s" % dump_src) + + self.log.info("Sorting src table: %s", dump_src) self.do_sort(dump_src, dump_src + '.sorted') - self.log.info("Sorting dst table: %s" % dump_dst) + self.log.info("Sorting dst table: %s", dump_dst) self.do_sort(dump_dst, dump_dst + '.sorted') self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted") @@ -127,7 +127,7 @@ class Repairer(Syncer): self.fq_common_fields = fqlist cols = ",".join(fqlist) - self.log.debug("using columns: %s" % cols) + self.log.debug("using columns: %s", cols) def dump_table(self, tbl, curs, fn, whr): """Dump table to disk.""" @@ -135,12 +135,12 @@ class Repairer(Syncer): if len(whr) == 0: whr = 'true' q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr) - self.log.debug("Query: %s" % q) + self.log.debug("Query: %s", q) f = open(fn, "w", 64*1024) curs.copy_expert(q, f) size = f.tell() f.close() - self.log.info('%s: Got %d bytes' % (tbl, size)) + self.log.info('%s: Got %d bytes', tbl, size) def get_row(self, ln): """Parse a row into dict.""" @@ -154,7 +154,7 @@ class Repairer(Syncer): def dump_compare(self, tbl, src_fn, dst_fn): """Dump + compare single table.""" - self.log.info("Comparing dumps: %s" % tbl) + self.log.info("Comparing dumps: %s", tbl) self.cnt_insert = 0 self.cnt_update = 0 self.cnt_delete = 0 @@ -197,10 +197,10 @@ class Repairer(Syncer): dst_ln = f2.readline() if dst_ln: self.total_dst += 1 - self.log.info("finished %s: src: %d rows, dst: %d rows,"\ - " missed: %d inserts, %d updates, %d deletes" % ( + self.log.info("finished %s: src: %d rows, dst: %d rows," + " missed: %d inserts, %d updates, %d deletes", tbl, self.total_src, self.total_dst, - self.cnt_insert, self.cnt_update, self.cnt_delete)) + self.cnt_insert, self.cnt_update, self.cnt_delete) def got_missed_insert(self, tbl, src_row): """Create sql for missed insert.""" @@ -248,7 +248,7 @@ class Repairer(Syncer): def show_fix(self, tbl, q, desc): """Print/write/apply repair sql.""" - self.log.info("missed %s: %s" % (desc, q)) + self.log.info("missed %s: %s", desc, q) if self.apply_curs: self.apply_curs.execute(q) else: @@ -300,7 +300,7 @@ class Repairer(Syncer): def cmp_keys(self, src_row, dst_row): """Compare primary keys of the rows. - + Returns 1 if src > dst, -1 if src < dst and 0 if src == dst""" # None means table is done. tag it larger than any existing row. @@ -319,4 +319,3 @@ class Repairer(Syncer): elif v1 > v2: return 1 return 0 - diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 0cc4401e..c54e2644 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -140,7 +140,7 @@ class LondisteSetup(CascadeAdmin): for tbl in args: tbl = skytools.fq_name(tbl) if (tbl in src_tbls) and not src_tbls[tbl]['local']: - self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl) + self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl) problems = True if problems: self.log.error("Problems, canceling operation") @@ -189,12 +189,12 @@ class LondisteSetup(CascadeAdmin): if create_flags: if tbl_exists: - self.log.info('Table %s already exist, not touching' % desc) + self.log.info('Table %s already exist, not touching', desc) else: src_dest_table = src_tbls[tbl]['dest_table'] if not skytools.exists_table(src_curs, src_dest_table): # table not present on provider - nowhere to get the DDL from - self.log.warning('Table %s missing on provider, cannot create, skipping' % desc) + self.log.warning('Table %s missing on provider, cannot create, skipping', desc) return schema = skytools.fq_name_parts(dest_table)[0] if not skytools.exists_schema(dst_curs, schema): @@ -276,7 +276,7 @@ class LondisteSetup(CascadeAdmin): for tbl in src_tbls.keys(): q = "select * from londiste.global_add_table(%s, %s)" if tbl not in dst_tbls: - self.log.info("Table %s info missing from subscriber, adding" % tbl) + self.log.info("Table %s info missing from subscriber, adding", tbl) self.exec_cmd(dst_curs, q, [self.set_name, tbl]) dst_tbls[tbl] = {'local': False, 'dest_table': tbl} for tbl in dst_tbls.keys(): @@ -317,7 +317,7 @@ class LondisteSetup(CascadeAdmin): " where table_name = %s and local" curs.execute(q, [self.set_name, tbl]) if curs.rowcount == 0: - self.log.error("Table %s not found on this node" % tbl) + self.log.error("Table %s not found on this node", tbl) sys.exit(1) attrs, dest_table = curs.fetchone() @@ -382,17 +382,17 @@ class LondisteSetup(CascadeAdmin): seq_exists = skytools.exists_sequence(dst_curs, seq) if create_flags: if seq_exists: - self.log.info('Sequence %s already exist, not creating' % seq) + self.log.info('Sequence %s already exist, not creating', seq) else: if not skytools.exists_sequence(src_curs, seq): # sequence not present on provider - nowhere to get the DDL from - self.log.warning('Sequence "%s" missing on provider, skipping' % seq) + self.log.warning('Sequence "%s" missing on provider, skipping', seq) return s = skytools.SeqStruct(src_curs, seq) src_db.commit() s.create(dst_curs, create_flags, log = self.log) elif not seq_exists: - self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary' % seq) + self.log.warning('Sequence "%s" missing on subscriber, use --create if necessary', seq) return q = "select * from londiste.local_add_seq(%s, %s)" @@ -410,7 +410,7 @@ class LondisteSetup(CascadeAdmin): for seq in src_seqs.keys(): q = "select * from londiste.global_update_seq(%s, %s, %s)" if seq not in dst_seqs: - self.log.info("Sequence %s info missing from subscriber, adding" % seq) + self.log.info("Sequence %s info missing from subscriber, adding", seq) self.exec_cmd(dst_curs, q, [self.set_name, seq, src_seqs[seq]['last_value']]) tmp = src_seqs[seq].copy() tmp['local'] = False @@ -504,7 +504,7 @@ class LondisteSetup(CascadeAdmin): res = self.exec_cmd(db, q, [self.queue_name, fname, sql, attrs.to_urlenc()], commit = False) ret = res[0]['ret_code'] if ret >= 300: - self.log.warning("Skipping execution of '%s'" % fname) + self.log.warning("Skipping execution of '%s'", fname) continue if attrs.need_execute(curs, local_tables, local_seqs): self.log.info("%s: executing sql", fname) @@ -605,16 +605,16 @@ class LondisteSetup(CascadeAdmin): res_list.append(a) res_map[a] = 1 elif a in reverse_map: - self.log.info("%s already processed" % a) + self.log.info("%s already processed", a) elif allow_nonexist: res_list.append(a) res_map[a] = 1 elif self.options.force: - self.log.warning("%s not available, but --force is used" % a) + self.log.warning("%s not available, but --force is used", a) res_list.append(a) res_map[a] = 1 else: - self.log.warning("%s not available" % a) + self.log.warning("%s not available", a) err = 1 if err: raise skytools.UsageError("Cannot proceed") diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py index befeaded..1713f6e9 100644 --- a/python/londiste/syncer.py +++ b/python/londiste/syncer.py @@ -145,11 +145,11 @@ class Syncer(skytools.DBScript): for tbl in tlist: tbl = skytools.fq_name(tbl) if not tbl in dst_tables: - self.log.warning('Table not subscribed: %s' % tbl) + self.log.warning('Table not subscribed: %s', tbl) continue t2 = dst_tables[tbl] if t2.merge_state != 'ok': - self.log.warning('Table %s not synced yet, no point' % tbl) + self.log.warning('Table %s not synced yet, no point', tbl) continue pnode, ploc, wname = find_copy_source(self, self.queue_name, tbl, pnode, ploc) @@ -179,12 +179,12 @@ class Syncer(skytools.DBScript): src_tables, ignore = self.get_tables(src_db) if not tbl in src_tables: - self.log.warning('Table not available on provider: %s' % tbl) + self.log.warning('Table not available on provider: %s', tbl) return t1 = src_tables[tbl] if t1.merge_state != 'ok': - self.log.warning('Table %s not ready yet on provider' % tbl) + self.log.warning('Table %s not ready yet on provider', tbl) return #self.check_consumer(setup_db, dst_db) @@ -231,10 +231,10 @@ class Syncer(skytools.DBScript): dst_curs = dst_db.cursor() if not skytools.exists_table(src_curs, src_tbl): - self.log.warning("Table %s does not exist on provider side" % src_tbl) + self.log.warning("Table %s does not exist on provider side", src_tbl) return if not skytools.exists_table(dst_curs, dst_tbl): - self.log.warning("Table %s does not exist on subscriber side" % dst_tbl) + self.log.warning("Table %s does not exist on subscriber side", dst_tbl) return # lock table against changes @@ -273,14 +273,14 @@ class Syncer(skytools.DBScript): lock_curs = lock_db.cursor() # lock table in separate connection - self.log.info('Locking %s' % src_tbl) + self.log.info('Locking %s', src_tbl) lock_db.commit() self.set_lock_timeout(lock_curs) lock_time = time.time() lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl)) # now wait until consumer has updated target table until locking - self.log.info('Syncing %s' % dst_tbl) + self.log.info('Syncing %s', dst_tbl) # consumer must get futher than this tick tick_id = self.force_tick(setup_curs) @@ -313,7 +313,7 @@ class Syncer(skytools.DBScript): self.old_worker_paused = self.pause_consumer(setup_curs, self.provider_info['worker_name']) lock_curs = lock_db.cursor() - self.log.info('Syncing %s' % dst_tbl) + self.log.info('Syncing %s', dst_tbl) # consumer must get futher than this tick tick_id = self.force_tick(setup_curs, False) @@ -375,4 +375,3 @@ class Syncer(skytools.DBScript): break time.sleep(0.5) return oldflag - diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index f325a7c2..8e026235 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -64,7 +64,7 @@ class CopyTable(Replicator): if tbl_stat.copy_role == 'wait-copy': self.log.info('waiting for first partition to initialize copy') elif tbl_stat.max_parallel_copies_reached(): - self.log.info('number of max parallel copies (%s) reached' %\ + self.log.info('number of max parallel copies (%s) reached', tbl_stat.max_parallel_copy) else: break @@ -81,7 +81,7 @@ class CopyTable(Replicator): if pt.state == TABLE_OK: break - self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name) + self.log.warning("table %s not in sync yet on provider, waiting", tbl_stat.name) time.sleep(10) src_real_table = pt.dest_table @@ -102,7 +102,7 @@ class CopyTable(Replicator): self.sync_database_encodings(src_db, dst_db) - self.log.info("Starting full copy of %s" % tbl_stat.name) + self.log.info("Starting full copy of %s", tbl_stat.name) # just in case, drop all fkeys (in case "replay" was skipped) # !! this may commit, so must be done before anything else !! @@ -124,14 +124,14 @@ class CopyTable(Replicator): common_cols = [] for c in slist: if c not in dlist: - self.log.warning("Table %s column %s does not exist on subscriber" - % (tbl_stat.name, c)) + self.log.warning("Table %s column %s does not exist on subscriber", + tbl_stat.name, c) else: common_cols.append(c) for c in dlist: if c not in slist: - self.log.warning("Table %s column %s does not exist on provider" - % (tbl_stat.name, c)) + self.log.warning("Table %s column %s does not exist on provider", + tbl_stat.name, c) # drop unnecessary stuff if cmode > 0: @@ -140,9 +140,9 @@ class CopyTable(Replicator): # drop data if tbl_stat.table_attrs.get('skip_truncate'): - self.log.info("%s: skipping truncate" % tbl_stat.name) + self.log.info("%s: skipping truncate", tbl_stat.name) else: - self.log.info("%s: truncating" % tbl_stat.name) + self.log.info("%s: truncating", tbl_stat.name) q = "truncate " if dst_db.server_version >= 80400: q += "only " @@ -160,12 +160,12 @@ class CopyTable(Replicator): tbl_stat.dropped_ddl = ddl # do truncate & copy - self.log.info("%s: start copy" % tbl_stat.name) + self.log.info("%s: start copy", tbl_stat.name) p = tbl_stat.get_plugin() stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols) if stats: - self.log.info("%s: copy finished: %d bytes, %d rows" % ( - tbl_stat.name, stats[0], stats[1])) + self.log.info("%s: copy finished: %d bytes, %d rows", + tbl_stat.name, stats[0], stats[1]) # get snapshot src_curs.execute("select txid_current_snapshot()") @@ -269,4 +269,3 @@ class CopyTable(Replicator): if __name__ == '__main__': script = CopyTable(sys.argv[1:]) script.start() - |
