diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/londiste.py | 2 | ||||
-rw-r--r-- | python/londiste/compare.py | 19 | ||||
-rw-r--r-- | python/londiste/handler.py | 30 | ||||
-rw-r--r-- | python/londiste/handlers/__init__.py | 5 | ||||
-rw-r--r-- | python/londiste/handlers/bulk.py | 18 | ||||
-rw-r--r-- | python/londiste/handlers/dispatch.py | 25 | ||||
-rw-r--r-- | python/londiste/handlers/multimaster.py | 4 | ||||
-rw-r--r-- | python/londiste/handlers/part.py | 4 | ||||
-rw-r--r-- | python/londiste/handlers/qtable.py | 4 | ||||
-rw-r--r-- | python/londiste/playback.py | 12 | ||||
-rw-r--r-- | python/londiste/repair.py | 53 | ||||
-rw-r--r-- | python/londiste/setup.py | 44 | ||||
-rw-r--r-- | python/londiste/syncer.py | 83 | ||||
-rw-r--r-- | python/londiste/table_copy.py | 18 |
14 files changed, 194 insertions, 127 deletions
diff --git a/python/londiste.py b/python/londiste.py index 542c9c5d..66b19648 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -108,6 +108,8 @@ class Londiste(skytools.DBScript): g = optparse.OptionGroup(p, "options for add") g.add_option("--all", action="store_true", help = "add: include add possible tables") + g.add_option("--dest-table", + help = "add: redirect changes to different table") g.add_option("--force", action="store_true", help = "add: ignore table differences, repair: ignore lag") g.add_option("--expect-sync", action="store_true", dest="expect_sync", diff --git a/python/londiste/compare.py b/python/londiste/compare.py index a52d24be..7274d180 100644 --- a/python/londiste/compare.py +++ b/python/londiste/compare.py @@ -15,35 +15,38 @@ class Comparator(Syncer): """Simple checker based in Syncer. When tables are in sync runs simple SQL query on them. """ - def process_sync(self, tbl, src_db, dst_db): + def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): """Actual comparision.""" src_curs = src_db.cursor() dst_curs = dst_db.cursor() - self.log.info('Counting %s' % tbl) + self.log.info('Counting %s' % dst_tbl) q = "select count(1) as cnt, sum(hashtext(t.*::text)) as chksum from only _TABLE_ t" q = self.cf.get('compare_sql', q) - q = q.replace('_TABLE_', skytools.quote_fqident(tbl)) + src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl)) + dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl)) f = "%(cnt)d rows, checksum=%(chksum)s" f = self.cf.get('compare_fmt', f) - self.log.debug("srcdb: " + q) - src_curs.execute(q) + self.log.debug("srcdb: " + src_q) + src_curs.execute(src_q) src_row = src_curs.fetchone() src_str = f % src_row self.log.info("srcdb: %s" % src_str) + src_db.commit() - self.log.debug("dstdb: " + q) - dst_curs.execute(q) + self.log.debug("dstdb: " + dst_q) + dst_curs.execute(dst_q) dst_row = dst_curs.fetchone() dst_str = f % dst_row self.log.info("dstdb: %s" % dst_str) + dst_db.commit() if src_str != dst_str: - self.log.warning("%s: Results do not match!" % tbl) + self.log.warning("%s: Results do not match!" % dst_tbl) if __name__ == '__main__': script = Comparator(sys.argv[1:]) diff --git a/python/londiste/handler.py b/python/londiste/handler.py index aff0e4ce..2fe69f21 100644 --- a/python/londiste/handler.py +++ b/python/londiste/handler.py @@ -29,7 +29,10 @@ plain londiste: """ -import sys, skytools, londiste.handlers +import sys +import logging +import skytools +import londiste.handlers __all__ = ['RowCache', 'BaseHandler', 'build_handler', 'load_handler_modules', 'create_handler_string'] @@ -66,10 +69,14 @@ class BaseHandler: """Defines base API, does nothing. """ handler_name = 'nop' - def __init__(self, table_name, args, log): + log = logging.getLogger('basehandler') + + def __init__(self, table_name, args, dest_table): self.table_name = table_name + self.dest_table = dest_table or table_name + self.fq_table_name = skytools.quote_fqident(self.table_name) + self.fq_dest_table = skytools.quote_fqident(self.dest_table) self.args = args - self.log = log def add(self, trigger_arg_list): """Called when table is added. @@ -99,13 +106,14 @@ class BaseHandler: """Called when batch finishes.""" pass - def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list): + def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list): """do actual table copy and return tuple with number of bytes and rows copyed """ condition = ' and '.join(cond_list) - return skytools.full_copy(tablename, src_curs, dst_curs, column_list, - condition) + return skytools.full_copy(src_tablename, src_curs, dst_curs, + column_list, condition, + dst_tablename = self.dest_table) def needs_table(self): """Does the handler need the table to exist on destination.""" @@ -124,7 +132,7 @@ class TableHandler(BaseHandler): def process_event(self, ev, sql_queue_func, arg): if len(ev.type) == 1: # sql event - fqname = skytools.quote_fqident(ev.extra1) + fqname = self.fq_dest_table fmt = self.sql_command[ev.type] sql = fmt % (fqname, ev.data) else: @@ -132,7 +140,7 @@ class TableHandler(BaseHandler): pklist = ev.type[2:].split(',') row = skytools.db_urldecode(ev.data) op = ev.type[0] - tbl = ev.extra1 + tbl = self.dest_table if op == 'I': sql = skytools.mk_insert_sql(row, tbl, pklist) elif op == 'U': @@ -188,7 +196,7 @@ def _parse_handler(hstr): args = skytools.db_urldecode(astr) return (name, args) -def build_handler(tblname, hstr, log): +def build_handler(tblname, hstr, dest_table=None): """Parse and initialize handler. hstr is result of create_handler_string().""" @@ -196,7 +204,9 @@ def build_handler(tblname, hstr, log): # when no handler specified, use londiste hname = hname or 'londiste' klass = _handler_map[hname] - return klass(tblname, args, log) + if not dest_table: + dest_table = tblname + return klass(tblname, args, dest_table) def load_handler_modules(cf): """Load and register modules from config.""" diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py index 764518ef..8467916a 100644 --- a/python/londiste/handlers/__init__.py +++ b/python/londiste/handlers/__init__.py @@ -19,8 +19,8 @@ def handler_args(name, cls): Define successor for handler class cls with func as argument generator """ def wrapper(func): - def _init_override(self, table_name, args, log): - cls.__init__(self, table_name, func(args.copy()), log) + def _init_override(self, table_name, args, dest_table): + cls.__init__(self, table_name, func(args.copy()), dest_table) dct = {'__init__': _init_override, 'handler_name': name} module = sys.modules[cls.__module__] newname = '%s_%s' % (cls.__name__, name.replace('.','_')) @@ -36,3 +36,4 @@ def update(*p): in reverse order """ return reduce(lambda x, y: x.update(y) or x, (p[i] for i in range(len(p)-1,-1,-1)), {}) + diff --git a/python/londiste/handlers/bulk.py b/python/londiste/handlers/bulk.py index 5b5dcc01..7bc3797b 100644 --- a/python/londiste/handlers/bulk.py +++ b/python/londiste/handlers/bulk.py @@ -57,10 +57,10 @@ class BulkLoader(BaseHandler): """ handler_name = 'bulk' fake_seq = 0 - def __init__(self, table_name, args, log): + def __init__(self, table_name, args, dest_table): """Init per-batch table data cache.""" - BaseHandler.__init__(self, table_name, args, log) + BaseHandler.__init__(self, table_name, args, dest_table) self.pkey_list = None self.dist_fields = None @@ -194,8 +194,8 @@ class BulkLoader(BaseHandler): # create temp table temp, qtemp = self.create_temp_table(curs) - tbl = self.table_name - qtbl = quote_fqident(self.table_name) + tbl = self.dest_table + qtbl = self.fq_dest_table # where expr must have pkey and dist fields klist = [] @@ -307,10 +307,10 @@ class BulkLoader(BaseHandler): def create_temp_table(self, curs): if USE_REAL_TABLE: - tempname = self.table_name + "_loadertmpx" + tempname = self.dest_table + "_loadertmpx" else: # create temp table for loading - tempname = self.table_name.replace('.', '_') + "_loadertmp" + tempname = self.dest_table.replace('.', '_') + "_loadertmp" # check if exists if USE_REAL_TABLE: @@ -321,7 +321,7 @@ class BulkLoader(BaseHandler): # create non-temp table q = "create table %s (like %s)" % ( quote_fqident(tempname), - quote_fqident(self.table_name)) + quote_fqident(self.dest_table)) self.log.debug("bulk: Creating real table: %s" % q) curs.execute(q) return tempname, quote_fqident(tempname) @@ -335,7 +335,7 @@ class BulkLoader(BaseHandler): arg = "on commit preserve rows" # create temp table for loading q = "create temp table %s (like %s) %s" % ( - quote_ident(tempname), quote_fqident(self.table_name), arg) + quote_ident(tempname), quote_fqident(self.dest_table), arg) self.log.debug("bulk: Creating temp table: %s" % q) curs.execute(q) return tempname, quote_ident(tempname) @@ -343,7 +343,7 @@ class BulkLoader(BaseHandler): def find_dist_fields(self, curs): if not skytools.exists_table(curs, "pg_catalog.gp_distribution_policy"): return [] - schema, name = skytools.fq_name_parts(self.table_name) + schema, name = skytools.fq_name_parts(self.dest_table) q = "select a.attname"\ " from pg_class t, pg_namespace n, pg_attribute a,"\ " gp_distribution_policy p"\ diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 0c67061c..b9c3e3fb 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -663,14 +663,16 @@ class Dispatcher(BaseHandler): """ handler_name = 'dispatch' - def __init__(self, table_name, args, log): - BaseHandler.__init__(self, table_name, args, log) + def __init__(self, table_name, args, dest_table): + + # compat for dest-table + dest_table = args.get('table', dest_table) + + BaseHandler.__init__(self, table_name, args, dest_table) + # show args self.log.debug("dispatch.init: table_name=%r, args=%r" % \ (table_name, args)) - # get table name - self.table_name = args.get('table', self.table_name) - self.quoted_name = quote_fqident(self.table_name) self.batch_info = None self.dst_curs = None self.pkeys = None @@ -805,7 +807,7 @@ class Dispatcher(BaseHandler): if dst not in self.row_handler.table_map: self.check_part(dst, part_time) else: - dst = self.table_name + dst = self.dest_table if dst not in self.row_handler.table_map: self.row_handler.add_table(dst, LOADERS[self.conf.load_mode], @@ -841,7 +843,7 @@ class Dispatcher(BaseHandler): else: raise UsageError('Bad value for part_mode: %s' %\ self.conf.part_mode) - vals = {'parent': self.table_name, + vals = {'parent': self.dest_table, 'year': "%04d" % dtm.year, 'month': "%02d" % dtm.month, 'day': "%02d" % dtm.day, @@ -861,7 +863,7 @@ class Dispatcher(BaseHandler): dst = quote_fqident(dst) vals = {'dest': dst, 'part': dst, - 'parent': self.quoted_name, + 'parent': self.fq_dest_table, 'pkeys': ",".join(self.pkeys), # quoting? # we do this to make sure that constraints for # tables who contain a schema will still work @@ -887,7 +889,7 @@ class Dispatcher(BaseHandler): else: self.log.debug('part func %s not found, cloning table' %\ PART_FUNC) - struct = TableStruct(curs, self.table_name) + struct = TableStruct(curs, self.dest_table) struct.create(curs, T_ALL, dst) exec_with_vals(self.conf.post_part) self.log.info("Created table: %s" % dst) @@ -914,8 +916,9 @@ class Dispatcher(BaseHandler): else: _write_hook = None - return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols, - condition, self.table_name, _dst_cols, + return skytools.full_copy(tablename, src_curs, dst_curs, _src_cols, condition, + dst_tablename = self.dest_table, + dst_column_list = _dst_cols, write_hook = _write_hook) diff --git a/python/londiste/handlers/multimaster.py b/python/londiste/handlers/multimaster.py index 494bd866..872b77e1 100644 --- a/python/londiste/handlers/multimaster.py +++ b/python/londiste/handlers/multimaster.py @@ -19,7 +19,7 @@ class MultimasterHandler(ApplyFuncHandler): """Handle multimaster replicas""" handler_name = 'multimaster' - def __init__(self, table_name, args, log): + def __init__(self, table_name, args, dest_table): """Init per-batch table data cache.""" conf = args.copy() # remove Multimaster args from conf @@ -28,7 +28,7 @@ class MultimasterHandler(ApplyFuncHandler): conf.pop(name) conf = skytools.db_urlencode(conf) args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf}) - ApplyFuncHandler.__init__(self, table_name, args, log) + ApplyFuncHandler.__init__(self, table_name, args, dest_table) def add(self, trigger_arg_list): """Create SKIP and BEFORE INSERT trigger""" diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py index fa5ccb77..6e644027 100644 --- a/python/londiste/handlers/part.py +++ b/python/londiste/handlers/part.py @@ -10,8 +10,8 @@ __all__ = ['PartHandler'] class PartHandler(TableHandler): handler_name = 'part' - def __init__(self, table_name, args, log): - TableHandler.__init__(self, table_name, args, log) + def __init__(self, table_name, args, dest_table): + TableHandler.__init__(self, table_name, args, dest_table) self.max_part = None # max part number self.local_part = None # part number of local node self.key = args.get('key') diff --git a/python/londiste/handlers/qtable.py b/python/londiste/handlers/qtable.py index b904e0e1..cd8cb03d 100644 --- a/python/londiste/handlers/qtable.py +++ b/python/londiste/handlers/qtable.py @@ -39,9 +39,9 @@ class QueueTableHandler(BaseHandler): class QueueSplitterHandler(BaseHandler): handler_name = 'qsplitter' - def __init__(self, table_name, args, log): + def __init__(self, table_name, args, dest_table): """Init per-batch table data cache.""" - BaseHandler.__init__(self, table_name, args, log) + BaseHandler.__init__(self, table_name, args, dest_table) try: self.dst_queue_name = args['queue'] except KeyError: diff --git a/python/londiste/playback.py b/python/londiste/playback.py index b7a3ac95..e9c20ea8 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -62,6 +62,7 @@ class TableState(object): def __init__(self, name, log): """Init TableState for one table.""" self.name = name + self.dest_table = name self.log = log # same as forget: self.state = TABLE_MISSING @@ -188,9 +189,14 @@ class TableState(object): self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy', self.max_parallel_copy)) + if 'dest_table' in row and row['dest_table']: + self.dest_table = row['dest_table'] + else: + self.dest_table = self.name + hstr = self.table_attrs.get('handlers', '') # compat hstr = self.table_attrs.get('handler', hstr) - self.plugin = build_handler(self.name, hstr, self.log) + self.plugin = build_handler(self.name, hstr, self.dest_table) def max_parallel_copies_reached(self): return self.max_parallel_copy and\ @@ -480,7 +486,7 @@ class Replicator(CascadedWorker): npossible -= 1 # drop all foreign keys to and from this table - self.drop_fkeys(dst_db, t.name) + self.drop_fkeys(dst_db, t.dest_table) # change state after fkeys are dropped thus allowing # failure inbetween @@ -627,7 +633,7 @@ class Replicator(CascadedWorker): self.stat_increase('ignored_events') return - fqname = skytools.quote_fqident(ev.extra1) + fqname = skytools.quote_fqident(t.dest_table) if dst_curs.connection.server_version >= 80400: sql = "TRUNCATE ONLY %s;" % fqname else: diff --git a/python/londiste/repair.py b/python/londiste/repair.py index dd0081ab..984920b8 100644 --- a/python/londiste/repair.py +++ b/python/londiste/repair.py @@ -54,29 +54,30 @@ class Repairer(Syncer): pkey_list = [] common_fields = [] - def process_sync(self, tbl, src_db, dst_db): + def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): """Actual comparision.""" src_curs = src_db.cursor() dst_curs = dst_db.cursor() - self.log.info('Checking %s' % tbl) + self.log.info('Checking %s' % dst_tbl) self.common_fields = [] + self.fq_common_fields = [] self.pkey_list = [] - copy_tbl = self.gen_copy_tbl(tbl, src_curs, dst_curs) + self.load_common_columns(src_tbl, dst_tbl, src_curs, dst_curs) - dump_src = tbl + ".src" - dump_dst = tbl + ".dst" + dump_src = dst_tbl + ".src" + dump_dst = dst_tbl + ".dst" - self.log.info("Dumping src table: %s" % tbl) - self.dump_table(tbl, copy_tbl, src_curs, dump_src) + self.log.info("Dumping src table: %s" % src_tbl) + self.dump_table(src_tbl, src_curs, dump_src) src_db.commit() - self.log.info("Dumping dst table: %s" % tbl) - self.dump_table(tbl, copy_tbl, dst_curs, dump_dst) + self.log.info("Dumping dst table: %s" % dst_tbl) + self.dump_table(dst_tbl, dst_curs, dump_dst) dst_db.commit() - self.log.info("Sorting src table: %s" % tbl) + self.log.info("Sorting src table: %s" % dump_src) s_in, s_out = os.popen4("sort --version") s_ver = s_out.read() @@ -86,26 +87,27 @@ class Repairer(Syncer): else: args = "" os.system("sort %s -T . -o %s.sorted %s" % (args, dump_src, dump_src)) - self.log.info("Sorting dst table: %s" % tbl) + self.log.info("Sorting dst table: %s" % dump_dst) os.system("sort %s -T . -o %s.sorted %s" % (args, dump_dst, dump_dst)) - self.dump_compare(tbl, dump_src + ".sorted", dump_dst + ".sorted") + self.dump_compare(dst_tbl, dump_src + ".sorted", dump_dst + ".sorted") os.unlink(dump_src) os.unlink(dump_dst) os.unlink(dump_src + ".sorted") os.unlink(dump_dst + ".sorted") - def gen_copy_tbl(self, tbl, src_curs, dst_curs): - """Create COPY expession from common fields.""" - self.pkey_list = get_pkey_list(src_curs, tbl) - dst_pkey = get_pkey_list(dst_curs, tbl) + def load_common_columns(self, src_tbl, dst_tbl, src_curs, dst_curs): + """Get common fields, put pkeys in start.""" + + self.pkey_list = get_pkey_list(src_curs, src_tbl) + dst_pkey = get_pkey_list(dst_curs, dst_tbl) if dst_pkey != self.pkey_list: self.log.error('pkeys do not match') sys.exit(1) - src_cols = get_column_list(src_curs, tbl) - dst_cols = get_column_list(dst_curs, tbl) + src_cols = get_column_list(src_curs, src_tbl) + dst_cols = get_column_list(dst_curs, dst_tbl) field_list = [] for f in self.pkey_list: field_list.append(f) @@ -118,17 +120,18 @@ class Repairer(Syncer): self.common_fields = field_list fqlist = [skytools.quote_ident(col) for col in field_list] + self.fq_common_fields = fqlist - tbl_expr = "%s (%s)" % (skytools.quote_fqident(tbl), ",".join(fqlist)) - - self.log.debug("using copy expr: %s" % tbl_expr) + cols = ",".join(fqlist) + self.log.debug("using columns: %s" % cols) - return tbl_expr - - def dump_table(self, tbl, copy_tbl, curs, fn): + def dump_table(self, tbl, curs, fn): """Dump table to disk.""" + cols = ','.join(self.fq_common_fields) + q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols) + f = open(fn, "w", 64*1024) - curs.copy_to(f, copy_tbl) + curs.copy_expert(q, f) size = f.tell() f.close() self.log.info('%s: Got %d bytes' % (tbl, size)) diff --git a/python/londiste/setup.py b/python/londiste/setup.py index b59fd56f..b9f69dc6 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -71,6 +71,8 @@ class LondisteSetup(CascadeAdmin): help="don't merge tables from source queues", default=False) p.add_option("--max-parallel-copy", type = "int", help="max number of parallel copy processes") + p.add_option("--dest-table", + help="add: name for actual table") return p @@ -122,7 +124,7 @@ class LondisteSetup(CascadeAdmin): # dont check for exist/not here (root handling) problems = False for tbl in args: - if (tbl in src_tbls) and not src_tbls[tbl]: + if (tbl in src_tbls) and not src_tbls[tbl]['local']: self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl) problems = True if problems: @@ -137,30 +139,42 @@ class LondisteSetup(CascadeAdmin): else: create_flags = 0 + # sanity check + if self.options.dest_table and len(args) > 1: + self.log.error("--dest-table can be given only for single table") + sys.exit(1) + # seems ok for tbl in args: tbl = skytools.fq_name(tbl) - self.add_table(src_db, dst_db, tbl, create_flags) + self.add_table(src_db, dst_db, tbl, create_flags, src_tbls) - def add_table(self, src_db, dst_db, tbl, create_flags): + def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - tbl_exists = skytools.exists_table(dst_curs, tbl) + src_dest_table = src_tbls[tbl]['dest_table'] + dest_table = self.options.dest_table or tbl + tbl_exists = skytools.exists_table(dst_curs, dest_table) if create_flags: if tbl_exists: - self.log.info('Table %s already exist, not touching' % tbl) + self.log.info('Table %s already exist, not touching' % dest_table) else: - if not skytools.exists_table(src_curs, tbl): + if not skytools.exists_table(src_curs, src_dest_table): # table not present on provider - nowhere to get the DDL from self.log.warning('Table "%s" missing on provider, skipping' % tbl) return - schema = skytools.fq_name_parts(tbl)[0] + schema = skytools.fq_name_parts(dest_table)[0] if not skytools.exists_schema(dst_curs, schema): q = "create schema %s" % skytools.quote_ident(schema) dst_curs.execute(q) - s = skytools.TableStruct(src_curs, tbl) + s = skytools.TableStruct(src_curs, src_dest_table) src_db.commit() - s.create(dst_curs, create_flags, log = self.log) + + # create, using rename logic only when necessary + newname = None + if src_dest_table != dest_table: + newname = dest_table + s.create(dst_curs, create_flags, log = self.log, new_table_name = newname) tgargs = [] if self.options.trigger_arg: @@ -179,7 +193,7 @@ class LondisteSetup(CascadeAdmin): if self.options.handler: hstr = londiste.handler.create_handler_string( self.options.handler, self.options.handler_arg) - p = londiste.handler.build_handler(tbl, hstr, self.log) + p = londiste.handler.build_handler(tbl, hstr, self.options.dest_table) attrs['handler'] = hstr p.add(tgargs) @@ -211,7 +225,7 @@ class LondisteSetup(CascadeAdmin): if self.options.handler: hstr = londiste.handler.create_handler_string( self.options.handler, self.options.handler_arg) - p = londiste.handler.build_handler('unused.string', hstr, self.log) + p = londiste.handler.build_handler('unused.string', hstr, None) return p.needs_table() return True @@ -221,7 +235,7 @@ class LondisteSetup(CascadeAdmin): if tbl not in dst_tbls: self.log.info("Table %s info missing from subscriber, adding" % tbl) self.exec_cmd(dst_curs, q, [self.set_name, tbl]) - dst_tbls[tbl] = False + dst_tbls[tbl] = {'local': False, 'dest_table': tbl} for tbl in dst_tbls.keys(): q = "select * from londiste.global_remove_table(%s, %s)" if tbl not in src_tbls: @@ -230,11 +244,13 @@ class LondisteSetup(CascadeAdmin): del dst_tbls[tbl] def fetch_set_tables(self, curs): - q = "select table_name, local from londiste.get_table_list(%s)" + q = "select table_name, local, "\ + " coalesce(dest_table, table_name) as dest_table "\ + " from londiste.get_table_list(%s)" curs.execute(q, [self.set_name]) res = {} for row in curs.fetchall(): - res[row[0]] = row[1] + res[row[0]] = row return res def cmd_remove_table(self, *args): diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py index 5bce2485..02955f93 100644 --- a/python/londiste/syncer.py +++ b/python/londiste/syncer.py @@ -4,6 +4,12 @@ import sys, time, skytools +class ATable: + def __init__(self, row): + self.table_name = row['table_name'] + self.dest_table = row['dest_table'] or row['table_name'] + self.merge_state = row['merge_state'] + class Syncer(skytools.DBScript): """Walks tables in primary key order and checks if data matches.""" @@ -60,14 +66,25 @@ class Syncer(skytools.DBScript): self.log.error('Consumer lagging too much, cannot proceed') sys.exit(1) - def get_subscriber_table_state(self, dst_db): - """Load table states from subscriber.""" - dst_curs = dst_db.cursor() - q = "select * from londiste.get_table_list(%s) where local" - dst_curs.execute(q, [self.queue_name]) - res = dst_curs.dictfetchall() - dst_db.commit() - return res + def get_tables(self, db): + """Load table info. + + Returns tuple of (dict(name->ATable), namelist)""" + + curs = db.cursor() + q = "select table_name, merge_state, dest_table"\ + " from londiste.get_table_list(%s) where local" + curs.execute(q, [self.queue_name]) + rows = curs.fetchall() + db.commit() + + res = {} + names = [] + for row in rows: + t = ATable(row) + res[t.table_name] = t + names.append(t.table_name) + return res, names def work(self): """Syncer main function.""" @@ -84,29 +101,32 @@ class Syncer(skytools.DBScript): self.check_consumer(setup_curs) - state_list = self.get_subscriber_table_state(dst_db) - state_map = {} - full_list = [] - for ts in state_list: - name = ts['table_name'] - full_list.append(name) - state_map[name] = ts + src_tables, ignore = self.get_tables(src_db) + dst_tables, names = self.get_tables(dst_db) if len(self.args) > 2: tlist = self.args[2:] else: - tlist = full_list + tlist = names for tbl in tlist: tbl = skytools.fq_name(tbl) - if not tbl in state_map: + if not tbl in dst_tables: self.log.warning('Table not subscribed: %s' % tbl) continue - st = state_map[tbl] - if st['merge_state'] != 'ok': - self.log.info('Table %s not synced yet, no point' % tbl) + if not tbl in src_tables: + self.log.warning('Table not available on provider: %s' % tbl) + continue + t1 = src_tables[tbl] + t2 = dst_tables[tbl] + + if t1.merge_state != 'ok': + self.log.warning('Table %s not ready yet on provider' % tbl) continue - self.check_table(tbl, lock_db, src_db, dst_db, setup_curs) + if t2.merge_state != 'ok': + self.log.warning('Table %s not synced yet, no point' % tbl) + continue + self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_curs) lock_db.commit() src_db.commit() dst_db.commit() @@ -131,7 +151,7 @@ class Syncer(skytools.DBScript): if dur > 10 and not self.options.force: raise Exception("Ticker seems dead") - def check_table(self, tbl, lock_db, src_db, dst_db, setup_curs): + def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_curs): """Get transaction to same state, then process.""" @@ -139,22 +159,22 @@ class Syncer(skytools.DBScript): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - if not skytools.exists_table(src_curs, tbl): - self.log.warning("Table %s does not exist on provider side" % tbl) + if not skytools.exists_table(src_curs, src_tbl): + self.log.warning("Table %s does not exist on provider side" % src_tbl) return - if not skytools.exists_table(dst_curs, tbl): - self.log.warning("Table %s does not exist on subscriber side" % tbl) + if not skytools.exists_table(dst_curs, dst_tbl): + self.log.warning("Table %s does not exist on subscriber side" % dst_tbl) return # lock table in separate connection - self.log.info('Locking %s' % tbl) + self.log.info('Locking %s' % src_tbl) lock_db.commit() self.set_lock_timeout(lock_curs) lock_time = time.time() - lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(tbl)) + lock_curs.execute("LOCK TABLE %s IN SHARE MODE" % skytools.quote_fqident(src_tbl)) # now wait until consumer has updated target table until locking - self.log.info('Syncing %s' % tbl) + self.log.info('Syncing %s' % dst_tbl) # consumer must get futher than this tick tick_id = self.force_tick(setup_curs) @@ -199,13 +219,13 @@ class Syncer(skytools.DBScript): lock_db.commit() # do work - self.process_sync(tbl, src_db, dst_db) + self.process_sync(src_tbl, dst_tbl, src_db, dst_db) # done src_db.commit() dst_db.commit() - def process_sync(self, tbl, src_db, dst_db): + def process_sync(self, src_tbl, dst_tbl, src_db, dst_db): """It gets 2 connections in state where tbl should be in same state. """ raise Exception('process_sync not implemented') @@ -215,3 +235,4 @@ class Syncer(skytools.DBScript): q = "select * from pgq_node.get_node_info(%s)" rows = self.exec_cmd(dst_db, q, [self.queue_name]) return rows[0]['provider_location'] + diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index d7d900c4..5f0f8bc5 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -80,6 +80,8 @@ class CopyTable(Replicator): self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name) time.sleep(10) + src_real_table = pt.dest_table + # 0 - dont touch # 1 - single tx # 2 - multi tx @@ -100,15 +102,15 @@ class CopyTable(Replicator): # just in case, drop all fkeys (in case "replay" was skipped) # !! this may commit, so must be done before anything else !! - self.drop_fkeys(dst_db, tbl_stat.name) + self.drop_fkeys(dst_db, tbl_stat.dest_table) # now start ddl-dropping tx - q = "lock table " + skytools.quote_fqident(tbl_stat.name) + q = "lock table " + skytools.quote_fqident(tbl_stat.dest_table) dst_curs.execute(q) # find dst struct - src_struct = TableStruct(src_curs, tbl_stat.name) - dst_struct = TableStruct(dst_curs, tbl_stat.name) + src_struct = TableStruct(src_curs, src_real_table) + dst_struct = TableStruct(dst_curs, tbl_stat.dest_table) # take common columns, warn on missing ones dlist = dst_struct.get_column_list() @@ -138,7 +140,7 @@ class CopyTable(Replicator): q = "truncate " if dst_db.server_version >= 80400: q += "only " - q += skytools.quote_fqident(tbl_stat.name) + q += skytools.quote_fqident(tbl_stat.dest_table) dst_curs.execute(q) if cmode == 2 and tbl_stat.dropped_ddl is None: @@ -152,7 +154,7 @@ class CopyTable(Replicator): tbl_stat.dropped_ddl = ddl # do truncate & copy - self.real_copy(src_curs, dst_curs, tbl_stat, common_cols) + self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table) # get snapshot src_curs.execute("select txid_current_snapshot()") @@ -208,7 +210,7 @@ class CopyTable(Replicator): src_curs.execute(q, [self.queue_name]) src_db.commit() - def real_copy(self, srccurs, dstcurs, tbl_stat, col_list): + def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table): "Actual copy." tablename = tbl_stat.name @@ -219,7 +221,7 @@ class CopyTable(Replicator): cond = tbl_stat.table_attrs.get('copy_condition') if cond: cond_list.append(cond) - stats = p.real_copy(tablename, srccurs, dstcurs, col_list, cond_list) + stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list) if stats: self.log.info("%s: copy finished: %d bytes, %d rows" % ( tablename, stats[0], stats[1])) |