diff options
Diffstat (limited to 'python/londiste/setup.py')
-rw-r--r-- | python/londiste/setup.py | 93 |
1 files changed, 72 insertions, 21 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 204850b6..20d3f5c0 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -7,6 +7,7 @@ import sys, os, re, skytools from pgq.cascade.admin import CascadeAdmin from londiste.exec_attrs import ExecAttrs +from londiste.util import find_copy_source import londiste.handler @@ -139,36 +140,45 @@ class LondisteSetup(CascadeAdmin): needs_tbl = self.handler_needs_table() args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl) + # pick proper create flags + if self.options.create_full: + create_flags = skytools.T_ALL + elif self.options.create: + create_flags = skytools.T_TABLE | skytools.T_PKEY + else: + create_flags = 0 + + # search for usable copy node if requested & needed + if (self.options.find_copy_node and create_flags != 0 + and needs_tbl and not self.is_root()): + src_name, src_loc, _ = find_copy_source(self, self.queue_name, args, None, self.provider_location) + self.options.copy_node = src_name + self.close_database('provider_db') + src_db = self.get_provider_db() + src_curs = src_db.cursor() + src_tbls = self.fetch_set_tables(src_curs) + src_db.commit() + # dont check for exist/not here (root handling) if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node: problems = False for tbl in args: tbl = skytools.fq_name(tbl) if (tbl in src_tbls) and not src_tbls[tbl]['local']: - self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl) - problems = True + if self.options.skip_non_existing: + self.log.warning("Table %s does not exist on provider", tbl) + else: + self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl) + problems = True if problems: self.log.error("Problems, canceling operation") sys.exit(1) - # pick proper create flags - if self.options.create_full: - create_flags = skytools.T_ALL - elif self.options.create: - create_flags = skytools.T_TABLE | skytools.T_PKEY - else: - create_flags = 0 - # sanity check if self.options.dest_table and len(args) > 1: self.log.error("--dest-table can be given only for single table") sys.exit(1) - # not implemented - if self.options.find_copy_node and create_flags != 0: - self.log.error("--find-copy-node does not work with --create") - sys.exit(1) - # seems ok for tbl in args: self.add_table(src_db, dst_db, tbl, create_flags, src_tbls) @@ -445,6 +455,42 @@ class LondisteSetup(CascadeAdmin): """Reload data from provider node.""" db = self.get_database('db') args = self.expand_arg_list(db, 'r', True, args) + + if not self.options.find_copy_node: + self.load_local_info() + src_db = self.get_provider_db() + src_curs = src_db.cursor() + src_tbls = self.fetch_set_tables(src_curs) + src_db.commit() + + problems = 0 + for tbl in args: + tbl = skytools.fq_name(tbl) + if tbl not in src_tbls or not src_tbls[tbl]['local']: + self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl) + problems += 1 + if problems > 0: + self.log.error("Problems, cancelling operation") + sys.exit(1) + + if self.options.find_copy_node or self.options.copy_node: + q = "select table_name, table_attrs from londiste.get_table_list(%s) where local" + cur = db.cursor() + cur.execute(q, [self.set_name]) + for row in cur.fetchall(): + if row['table_name'] not in args: + continue + attrs = skytools.db_urldecode (row['table_attrs'] or '') + + if self.options.find_copy_node: + attrs['copy_node'] = '?' + elif self.options.copy_node: + attrs['copy_node'] = self.options.copy_node + + attrs = skytools.db_urlencode (attrs) + q = "select * from londiste.local_set_table_attrs (%s, %s, %s)" + self.exec_cmd(db, q, [self.set_name, row['table_name'], attrs]) + q = "select * from londiste.local_set_table_state(%s, %s, null, null)" self.exec_cmd_many(db, q, [self.set_name], args) @@ -530,9 +576,8 @@ class LondisteSetup(CascadeAdmin): db.commit() def get_provider_db(self): - - # use custom node for copy if self.options.copy_node: + # use custom node for copy source_node = self.options.copy_node m = self.queue_info.get_member(source_node) if not m: @@ -546,6 +591,7 @@ class LondisteSetup(CascadeAdmin): q = 'select * from pgq_node.get_node_info(%s)' res = self.exec_cmd(db, q, [self.queue_name], quiet = True) self.provider_location = res[0]['provider_location'] + return self.get_database('provider_db', connstr = self.provider_location, profile = 'remote') def expand_arg_list(self, db, kind, existing, args, needs_tbl=True): @@ -586,6 +632,9 @@ class LondisteSetup(CascadeAdmin): res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist) else: res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist) + + if not res: + self.log.info("what to do ?") return res def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist): @@ -657,18 +706,19 @@ class LondisteSetup(CascadeAdmin): dst_curs = dst_db.cursor() partial = {} - done_pos = 1 startup_info = 0 while 1: dst_curs.execute(q, [self.queue_name]) rows = dst_curs.fetchall() dst_db.commit() + total_count = 0 cur_count = 0 done_list = [] for row in rows: if not row['local']: continue + total_count += 1 tbl = row['table_name'] if row['merge_state'] != 'ok': partial[tbl] = 0 @@ -678,13 +728,14 @@ class LondisteSetup(CascadeAdmin): partial[tbl] = 1 done_list.append(tbl) + done_count = total_count - cur_count + if not startup_info: - self.log.info("%d table(s) to copy", len(partial)) + self.log.info("%d/%d table(s) to copy", cur_count, total_count) startup_info = 1 for done in done_list: - self.log.info("%s: finished (%d/%d)", done, done_pos, len(partial)) - done_pos += 1 + self.log.info("%s: finished (%d/%d)", done, done_count, total_count) if cur_count == 0: break |