summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/londiste/setup.py55
-rw-r--r--sql/londiste/functions/londiste.local_add_table.sql29
2 files changed, 54 insertions, 30 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index b9f69dc6..0280142c 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -104,18 +104,27 @@ class LondisteSetup(CascadeAdmin):
node_db.commit()
provider_db.commit()
+ def is_root(self):
+ return self.queue_info.local_node.type == 'root'
+
def cmd_add_table(self, *args):
"""Attach table(s) to local node."""
- dst_db = self.get_database('db')
- dst_curs = dst_db.cursor()
+ self.load_local_info()
+
src_db = self.get_provider_db()
- src_curs = src_db.cursor()
+ if not self.is_root():
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
- src_tbls = self.fetch_set_tables(src_curs)
+ dst_db = self.get_database('db')
+ dst_curs = dst_db.cursor()
dst_tbls = self.fetch_set_tables(dst_curs)
- src_db.commit()
- self.sync_table_list(dst_curs, src_tbls, dst_tbls)
+ if self.is_root():
+ src_tbls = dst_tbls
+ else:
+ self.sync_table_list(dst_curs, src_tbls, dst_tbls)
dst_db.commit()
needs_tbl = self.handler_needs_table()
@@ -124,6 +133,7 @@ class LondisteSetup(CascadeAdmin):
# dont check for exist/not here (root handling)
problems = False
for tbl in args:
+ tbl = skytools.fq_name(tbl)
if (tbl in src_tbls) and not src_tbls[tbl]['local']:
self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl)
problems = True
@@ -146,22 +156,31 @@ class LondisteSetup(CascadeAdmin):
# seems ok
for tbl in args:
- tbl = skytools.fq_name(tbl)
self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
def add_table(self, src_db, dst_db, tbl, create_flags, src_tbls):
+ # use full names
+ tbl = skytools.fq_name(tbl)
+ dest_table = self.options.dest_table or tbl
+ dest_table = skytools.fq_name(dest_table)
+
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- src_dest_table = src_tbls[tbl]['dest_table']
- dest_table = self.options.dest_table or tbl
tbl_exists = skytools.exists_table(dst_curs, dest_table)
+
+ if dest_table == tbl:
+ desc = tbl
+ else:
+ desc = "%s(%s)" % (tbl, dest_table)
+
if create_flags:
if tbl_exists:
- self.log.info('Table %s already exist, not touching' % dest_table)
+ self.log.info('Table %s already exist, not touching' % desc)
else:
+ src_dest_table = src_tbls[tbl]['dest_table']
if not skytools.exists_table(src_curs, src_dest_table):
# table not present on provider - nowhere to get the DDL from
- self.log.warning('Table "%s" missing on provider, skipping' % tbl)
+ self.log.warning('Table %s missing on provider, skipping' % desc)
return
schema = skytools.fq_name_parts(dest_table)[0]
if not skytools.exists_schema(dst_curs, schema):
@@ -209,15 +228,13 @@ class LondisteSetup(CascadeAdmin):
if self.options.max_parallel_copy:
attrs['max_parallel_copy'] = self.options.max_parallel_copy
- args = [self.set_name, tbl, tgargs]
-
- if attrs:
- args.append(skytools.db_urlencode(attrs))
-
- q = "select * from londiste.local_add_table(%s)" %\
- ','.join(['%s']*len(args))
-
# actual table registration
+ args = [self.set_name, tbl, tgargs, None, None]
+ if attrs:
+ args[3] = skytools.db_urlencode(attrs)
+ if dest_table != tbl:
+ args[4] = dest_table
+ q = "select * from londiste.local_add_table(%s, %s, %s, %s, %s)"
self.exec_cmd(dst_curs, q, args)
dst_db.commit()
diff --git a/sql/londiste/functions/londiste.local_add_table.sql b/sql/londiste/functions/londiste.local_add_table.sql
index 6b477d06..f69ca568 100644
--- a/sql/londiste/functions/londiste.local_add_table.sql
+++ b/sql/londiste/functions/londiste.local_add_table.sql
@@ -97,6 +97,7 @@ declare
_dest_table text;
_got_extra1 boolean := false;
_table_name2 text;
+ _desc text;
begin
-------- i_trg_args ARGUMENTS PARSING
@@ -154,12 +155,18 @@ begin
_args := array_append(_args, quote_literal(arg));
end if;
+ if _dest_table = fq_table_name then
+ _desc := fq_table_name;
+ else
+ _desc := fq_table_name || '(' || _dest_table || ')';
+ end if;
+
-------- TABLE STRUCTURE CHECK
if not _virtual_table then
_tbloid := londiste.find_table_oid(_dest_table);
if _tbloid is null then
- select 404, 'Table does not exist: ' || _dest_table into ret_code, ret_note;
+ select 404, 'Table does not exist: ' || _desc into ret_code, ret_note;
return;
end if;
col_types := londiste.find_column_types(_dest_table);
@@ -176,7 +183,7 @@ begin
and coalesce(t.dest_table, t.table_name) = _dest_table
and t.dropped_ddl is not null;
if not found then
- select 400, 'Primary key missing on table: ' || _dest_table into ret_code, ret_note;
+ select 400, 'Primary key missing on table: ' || _desc into ret_code, ret_note;
return;
end if;
end if;
@@ -202,7 +209,7 @@ begin
return;
end if;
else
- select 404, 'Table not available on queue: ' || fq_table_name
+ select 404, 'Table not available on queue: ' || _desc
into ret_code, ret_note;
return;
end if;
@@ -214,7 +221,7 @@ begin
end if;
if tbl.local then
- select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
+ select 200, 'Table already added: ' || _desc into ret_code, ret_note;
return;
end if;
@@ -257,7 +264,7 @@ begin
-- if table from some other source is already marked as local,
-- raise error
if _local then
- select 405, 'Found local table '|| fq_table_name
+ select 405, 'Found local table '|| _desc
|| ' in queue ' || _queue_name
|| ', use remove-table first to remove all previous '
|| 'table subscriptions'
@@ -268,7 +275,7 @@ begin
-- when table comes from multiple sources, merge_all switch is
-- required
if not _merge_all then
- select 405, 'Found multiple sources for table '|| fq_table_name
+ select 405, 'Found multiple sources for table '|| _desc
|| ', use merge-all or no-merge to continue'
into ret_code, ret_note;
return;
@@ -354,7 +361,7 @@ begin
perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
if not found then
- select 200, 'Table added with no triggers: ' || fq_table_name into ret_code, ret_note;
+ select 200, 'Table added with no triggers: ' || _desc into ret_code, ret_note;
return;
end if;
-- on regular leaf, install deny trigger
@@ -384,7 +391,7 @@ begin
execute sql;
end if;
else
- select 405, 'Multiple SKIP triggers in table: ' || _dest_table
+ select 405, 'Multiple SKIP triggers in table: ' || _desc
into ret_code, ret_note;
return;
end if;
@@ -397,7 +404,7 @@ begin
if not found then
if _no_triggers then
- select 200, 'Table added with no triggers: ' || fq_table_name
+ select 200, 'Table added with no triggers: ' || _desc
into ret_code, ret_note;
return;
end if;
@@ -472,14 +479,14 @@ begin
if logtrg_previous is not null then
select 301,
- 'Table added: ' || fq_table_name
+ 'Table added: ' || _desc
|| ', but londiste trigger is not first: '
|| logtrg_previous
into ret_code, ret_note;
return;
end if;
- select 200, 'Table added: ' || fq_table_name into ret_code, ret_note;
+ select 200, 'Table added: ' || _desc into ret_code, ret_note;
return;
end;
$$ language plpgsql;