diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/londiste.py | 2 | ||||
-rw-r--r-- | python/londiste/compare.py | 12 | ||||
-rw-r--r-- | python/londiste/handler.py | 16 | ||||
-rw-r--r-- | python/londiste/handlers/dispatch.py | 4 | ||||
-rw-r--r-- | python/londiste/handlers/part.py | 9 | ||||
-rw-r--r-- | python/londiste/handlers/qtable.py | 2 | ||||
-rw-r--r-- | python/londiste/repair.py | 19 | ||||
-rw-r--r-- | python/londiste/setup.py | 4 | ||||
-rw-r--r-- | python/londiste/syncer.py | 11 | ||||
-rw-r--r-- | python/londiste/table_copy.py | 23 |
10 files changed, 52 insertions, 50 deletions
diff --git a/python/londiste.py b/python/londiste.py index 5d46bbdc..b6c59e15 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -138,8 +138,6 @@ class Londiste(skytools.DBScript): help = "add: walk upstream to find node to copy from") g.add_option("--copy-node", dest="copy_node", help = "add: use NODE as source for initial COPY") - g.add_option("--copy-condition", dest="copy_condition", - help = "add: set WHERE expression for copy") g.add_option("--merge-all", action="store_true", help="merge tables from all source queues", default=False) g.add_option("--no-merge", action="store_true", diff --git a/python/londiste/compare.py b/python/londiste/compare.py index b08a04e4..1412336c 100644 --- a/python/londiste/compare.py +++ b/python/londiste/compare.py @@ -15,12 +15,18 @@ class Comparator(Syncer): """Simple checker based in Syncer. When tables are in sync runs simple SQL query on them. """ - def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): + def process_sync(self, t1, t2, src_db, dst_db): """Actual comparision.""" + src_tbl = t1.dest_table + dst_tbl = t2.dest_table + src_curs = src_db.cursor() dst_curs = dst_db.cursor() + src_where = t1.plugin.get_copy_condition(src_curs, dst_curs) + dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs) + self.log.info('Counting %s' % dst_tbl) # get common cols @@ -37,7 +43,11 @@ class Comparator(Syncer): q = self.cf.get('compare_sql', q) q = q.replace("_COLS_", cols) src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl)) + if src_where: + src_q = src_q + " WHERE " + src_where dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl)) + if dst_where: + dst_q = dst_q + " WHERE " + dst_where f = "%(cnt)d rows, checksum=%(chksum)s" f = self.cf.get('compare_fmt', f) diff --git a/python/londiste/handler.py b/python/londiste/handler.py index 87a16b62..ad4239ff 100644 --- a/python/londiste/handler.py +++ b/python/londiste/handler.py @@ -106,11 +106,15 @@ class BaseHandler: """Called when batch finishes.""" pass - def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list): + def get_copy_condition(self, src_curs, dst_curs): + """ Use if you want to filter data """ + return '' + + def real_copy(self, src_tablename, src_curs, dst_curs, column_list): """do actual table copy and return tuple with number of bytes and rows copyed """ - condition = ' and '.join(cond_list) + condition = self.get_copy_condition(src_curs, dst_curs) return skytools.full_copy(src_tablename, src_curs, dst_curs, column_list, condition, dst_tablename = self.dest_table) @@ -184,19 +188,17 @@ class TableHandler(BaseHandler): return self.enc.validate_dict(row, self.table_name) return row - def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list): + def real_copy(self, src_tablename, src_curs, dst_curs, column_list): """do actual table copy and return tuple with number of bytes and rows copyed """ - - condition = ' and '.join(cond_list) - + if self.enc: def _write_hook(obj, data): return self.enc.validate_copy(data, column_list, src_tablename) else: _write_hook = None - + condition = self.get_copy_condition(src_curs, dst_curs) return skytools.full_copy(src_tablename, src_curs, dst_curs, column_list, condition, dst_tablename = self.dest_table, diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index dcfede52..2917472b 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -866,12 +866,12 @@ class Dispatcher(BaseHandler): exec_with_vals(self.conf.post_part) self.log.info("Created table: %s" % dst) - def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list): + def real_copy(self, tablename, src_curs, dst_curs, column_list): """do actual table copy and return tuple with number of bytes and rows copyed """ _src_cols = _dst_cols = column_list - condition = ' and '.join(cond_list) + condition = '' if self.conf.skip_fields: _src_cols = [col for col in column_list diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py index 8f0eb378..5213f67d 100644 --- a/python/londiste/handlers/part.py +++ b/python/londiste/handlers/part.py @@ -81,15 +81,12 @@ class PartHandler(TableHandler): self.log.debug('part.process_event: my event, processing') TableHandler.process_event(self, ev, sql_queue_func, arg) - def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list): - """Copy only slots needed locally.""" + def get_copy_condition(self, src_curs, dst_curs): + """Prepare the where condition for copy and replay filtering""" self.load_part_info(dst_curs) w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part) self.log.debug('part: copy_condition=%s' % w) - cond_list.append(w) - - return TableHandler.real_copy(self, tablename, src_curs, dst_curs, - column_list, cond_list) + return w def load_part_info(self, curs): """Load slot info from database.""" diff --git a/python/londiste/handlers/qtable.py b/python/londiste/handlers/qtable.py index b93543e9..13ca4099 100644 --- a/python/londiste/handlers/qtable.py +++ b/python/londiste/handlers/qtable.py @@ -33,7 +33,7 @@ class QueueTableHandler(BaseHandler): trigger_arg_list.append('SKIP') trigger_arg_list.append('expect_sync') - def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list): + def real_copy(self, tablename, src_curs, dst_curs, column_list): """Force copy not to start""" return (0,0) diff --git a/python/londiste/repair.py b/python/londiste/repair.py index 02494b53..5a04d7a0 100644 --- a/python/londiste/repair.py +++ b/python/londiste/repair.py @@ -33,7 +33,7 @@ class Repairer(Syncer): p.add_option("--apply", action="store_true", help="apply fixes") return p - def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): + def process_sync(self, t1, t2, src_db, dst_db): """Actual comparision.""" apply_db = None @@ -43,6 +43,9 @@ class Repairer(Syncer): self.apply_curs = apply_db.cursor() self.apply_curs.execute("set session_replication_role = 'replica'") + src_tbl = t1.dest_table + dst_tbl = t2.dest_table + src_curs = src_db.cursor() dst_curs = dst_db.cursor() @@ -57,10 +60,12 @@ class Repairer(Syncer): dump_dst = dst_tbl + ".dst" self.log.info("Dumping src table: %s" % src_tbl) - self.dump_table(src_tbl, src_curs, dump_src) + src_where = t1.plugin.get_copy_condition(src_curs, dst_curs) + self.dump_table(src_tbl, src_curs, dump_src, src_where) src_db.commit() self.log.info("Dumping dst table: %s" % dst_tbl) - self.dump_table(dst_tbl, dst_curs, dump_dst) + dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs) + self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where) dst_db.commit() self.log.info("Sorting src table: %s" % dump_src) @@ -123,11 +128,13 @@ class Repairer(Syncer): cols = ",".join(fqlist) self.log.debug("using columns: %s" % cols) - def dump_table(self, tbl, curs, fn): + def dump_table(self, tbl, curs, fn, whr): """Dump table to disk.""" cols = ','.join(self.fq_common_fields) - q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols) - + if len(whr) == 0: + whr = 'true' + q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr) + self.log.debug("Query: %s" % q) f = open(fn, "w", 64*1024) curs.copy_expert(q, f) size = f.tell() diff --git a/python/londiste/setup.py b/python/londiste/setup.py index b8ca4a0c..f06d9b0b 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -50,8 +50,6 @@ class LondisteSetup(CascadeAdmin): help = "add: find table source for copy by walking upwards") p.add_option("--copy-node", dest="copy_node", help = "add: use NODE as source for initial copy") - p.add_option("--copy-condition", dest="copy_condition", - help = "copy: where expression") p.add_option("--force", action="store_true", help="force", default=False) p.add_option("--all", action="store_true", @@ -247,8 +245,6 @@ class LondisteSetup(CascadeAdmin): if not self.options.expect_sync: if self.options.skip_truncate: attrs['skip_truncate'] = 1 - if self.options.copy_condition: - attrs['copy_condition'] = self.options.copy_condition if self.options.max_parallel_copy: attrs['max_parallel_copy'] = self.options.max_parallel_copy diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py index 05df41ad..5eb5da73 100644 --- a/python/londiste/syncer.py +++ b/python/londiste/syncer.py @@ -155,7 +155,7 @@ class Syncer(skytools.DBScript): self.check_consumer(setup_db) - self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_db) + self.check_table(t1, t2, lock_db, src_db, dst_db, setup_db) lock_db.commit() src_db.commit() dst_db.commit() @@ -185,9 +185,12 @@ class Syncer(skytools.DBScript): if dur > 10 and not self.options.force: raise Exception("Ticker seems dead") - def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_db): + def check_table(self, t1, t2, lock_db, src_db, dst_db, setup_db): """Get transaction to same state, then process.""" + src_tbl = t1.dest_table + dst_tbl = t2.dest_table + lock_curs = lock_db.cursor() src_curs = src_db.cursor() dst_curs = dst_db.cursor() @@ -221,7 +224,7 @@ class Syncer(skytools.DBScript): self.unlock_table_branch(lock_db, setup_db) # do work - bad = self.process_sync(src_tbl, dst_tbl, src_db, dst_db) + bad = self.process_sync(t1, t2, src_db, dst_db) if bad: self.bad_tables += 1 @@ -320,7 +323,7 @@ class Syncer(skytools.DBScript): setup_curs = setup_db.cursor() self.resume_consumer(setup_curs, self.provider_node['worker_name']) - def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): + def process_sync(self, t1, t2, src_db, dst_db): """It gets 2 connections in state where tbl should be in same state. """ raise Exception('process_sync not implemented') diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index 65a702fb..a2138d98 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -159,7 +159,12 @@ class CopyTable(Replicator): tbl_stat.dropped_ddl = ddl # do truncate & copy - self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table) + self.log.info("%s: start copy" % tbl_stat.name) + p = tbl_stat.get_plugin() + stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols) + if stats: + self.log.info("%s: copy finished: %d bytes, %d rows" % ( + tbl_stat.name, stats[0], stats[1])) # get snapshot src_curs.execute("select txid_current_snapshot()") @@ -215,22 +220,6 @@ class CopyTable(Replicator): src_curs.execute(q, [self.queue_name]) src_db.commit() - def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table): - "Actual copy." - - tablename = tbl_stat.name - # do copy - self.log.info("%s: start copy" % tablename) - p = tbl_stat.get_plugin() - cond_list = [] - cond = tbl_stat.table_attrs.get('copy_condition') - if cond: - cond_list.append(cond) - stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list) - if stats: - self.log.info("%s: copy finished: %d bytes, %d rows" % ( - tablename, stats[0], stats[1])) - def work(self): if not self.reg_ok: # check if needed? (table, not existing reg) |