diff options
-rw-r--r-- | python/londiste/setup.py | 55 | ||||
-rw-r--r-- | sql/londiste/functions/londiste.local_add_table.sql | 29 |
2 files changed, 54 insertions, 30 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py index b9f69dc6..0280142c 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -104,18 +104,27 @@ class LondisteSetup(CascadeAdmin): node_db.commit() provider_db.commit() + def is_root(self): + return self.queue_info.local_node.type == 'root' + def cmd_add_table(self, *args): """Attach table(s) to local node.""" - dst_db = self.get_database('db') - dst_curs = dst_db.cursor() + self.load_local_info() + src_db = self.get_provider_db() - src_curs = src_db.cursor() + if not self.is_root(): + src_curs = src_db.cursor() + src_tbls = self.fetch_set_tables(src_curs) + src_db.commit() - src_tbls = self.fetch_set_tables(src_curs) + dst_db = self.get_database('db') + dst_curs = dst_db.cursor() dst_tbls = self.fetch_set_tables(dst_curs) - src_db.commit() - self.sync_table_list(dst_curs, src_tbls, dst_tbls) + if self.is_root(): + src_tbls = dst_tbls + else: + self.sync_table_list(dst_curs, src_tbls, dst_tbls) dst_db.commit() needs_tbl = self.handler_needs_table() @@ -124,6 +133,7 @@ class LondisteSetup(CascadeAdmin): # dont check for exist/not here (root handling) problems = False for tbl in args: + tbl = skytools.fq_name(tbl) if (tbl in src_tbls) and not src_tbls[tbl]['local']: self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl) problems = True @@ -146,22 +156,31 @@ class LondisteSetup(CascadeAdmin): # seems ok for tbl in args: - tbl = skytools.fq_name(tbl) self.add_table(src_db, dst_db, tbl, create_flags, src_tbls) def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls): + # use full names + tbl = skytools.fq_name(tbl) + dest_table = self.options.dest_table or tbl + dest_table = skytools.fq_name(dest_table) + src_curs = src_db.cursor() dst_curs = dst_db.cursor() - src_dest_table = src_tbls[tbl]['dest_table'] - dest_table = self.options.dest_table or tbl tbl_exists = skytools.exists_table(dst_curs, dest_table) + + if dest_table == tbl: + desc = tbl + else: + desc = "%s(%s)" % (tbl, dest_table) + if create_flags: if tbl_exists: - self.log.info('Table %s already exist, not touching' % dest_table) + self.log.info('Table %s already exist, not touching' % desc) else: + src_dest_table = src_tbls[tbl]['dest_table'] if not skytools.exists_table(src_curs, src_dest_table): # table not present on provider - nowhere to get the DDL from - self.log.warning('Table "%s" missing on provider, skipping' % tbl) + self.log.warning('Table %s missing on provider, skipping' % desc) return schema = skytools.fq_name_parts(dest_table)[0] if not skytools.exists_schema(dst_curs, schema): @@ -209,15 +228,13 @@ class LondisteSetup(CascadeAdmin): if self.options.max_parallel_copy: attrs['max_parallel_copy'] = self.options.max_parallel_copy - args = [self.set_name, tbl, tgargs] - - if attrs: - args.append(skytools.db_urlencode(attrs)) - - q = "select * from londiste.local_add_table(%s)" %\ - ','.join(['%s']*len(args)) - # actual table registration + args = [self.set_name, tbl, tgargs, None, None] + if attrs: + args[3] = skytools.db_urlencode(attrs) + if dest_table != tbl: + args[4] = dest_table + q = "select * from londiste.local_add_table(%s, %s, %s, %s, %s)" self.exec_cmd(dst_curs, q, args) dst_db.commit() diff --git a/sql/londiste/functions/londiste.local_add_table.sql b/sql/londiste/functions/londiste.local_add_table.sql index 6b477d06..f69ca568 100644 --- a/sql/londiste/functions/londiste.local_add_table.sql +++ b/sql/londiste/functions/londiste.local_add_table.sql @@ -97,6 +97,7 @@ declare _dest_table text; _got_extra1 boolean := false; _table_name2 text; + _desc text; begin -------- i_trg_args ARGUMENTS PARSING @@ -154,12 +155,18 @@ begin _args := array_append(_args, quote_literal(arg)); end if; + if _dest_table = fq_table_name then + _desc := fq_table_name; + else + _desc := fq_table_name || '(' || _dest_table || ')'; + end if; + -------- TABLE STRUCTURE CHECK if not _virtual_table then _tbloid := londiste.find_table_oid(_dest_table); if _tbloid is null then - select 404, 'Table does not exist: ' || _dest_table into ret_code, ret_note; + select 404, 'Table does not exist: ' || _desc into ret_code, ret_note; return; end if; col_types := londiste.find_column_types(_dest_table); @@ -176,7 +183,7 @@ begin and coalesce(t.dest_table, t.table_name) = _dest_table and t.dropped_ddl is not null; if not found then - select 400, 'Primary key missing on table: ' || _dest_table into ret_code, ret_note; + select 400, 'Primary key missing on table: ' || _desc into ret_code, ret_note; return; end if; end if; @@ -202,7 +209,7 @@ begin return; end if; else - select 404, 'Table not available on queue: ' || fq_table_name + select 404, 'Table not available on queue: ' || _desc into ret_code, ret_note; return; end if; @@ -214,7 +221,7 @@ begin end if; if tbl.local then - select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note; + select 200, 'Table already added: ' || _desc into ret_code, ret_note; return; end if; @@ -257,7 +264,7 @@ begin -- if table from some other source is already marked as local, -- raise error if _local then - select 405, 'Found local table '|| fq_table_name + select 405, 'Found local table '|| _desc || ' in queue ' || _queue_name || ', use remove-table first to remove all previous ' || 'table subscriptions' @@ -268,7 +275,7 @@ begin -- when table comes from multiple sources, merge_all switch is -- required if not _merge_all then - select 405, 'Found multiple sources for table '|| fq_table_name + select 405, 'Found multiple sources for table '|| _desc || ', use merge-all or no-merge to continue' into ret_code, ret_note; return; @@ -354,7 +361,7 @@ begin perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace) where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga'); if not found then - select 200, 'Table added with no triggers: ' || fq_table_name into ret_code, ret_note; + select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note; return; end if; -- on regular leaf, install deny trigger @@ -384,7 +391,7 @@ begin execute sql; end if; else - select 405, 'Multiple SKIP triggers in table: ' || _dest_table + select 405, 'Multiple SKIP triggers in table: ' || _desc into ret_code, ret_note; return; end if; @@ -397,7 +404,7 @@ begin if not found then if _no_triggers then - select 200, 'Table added with no triggers: ' || fq_table_name + select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note; return; end if; @@ -472,14 +479,14 @@ begin if logtrg_previous is not null then select 301, - 'Table added: ' || fq_table_name + 'Table added: ' || _desc || ', but londiste trigger is not first: ' || logtrg_previous into ret_code, ret_note; return; end if; - select 200, 'Table added: ' || fq_table_name into ret_code, ret_note; + select 200, 'Table added: ' || _desc into ret_code, ret_note; return; end; $$ language plpgsql; |