summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/londiste/expected/londiste_merge.out99
-rw-r--r--sql/londiste/functions/londiste.get_table_list.sql48
-rw-r--r--sql/londiste/sql/londiste_merge.sql92
3 files changed, 192 insertions, 47 deletions
diff --git a/sql/londiste/expected/londiste_merge.out b/sql/londiste/expected/londiste_merge.out
index 238d343d..27bca55d 100644
--- a/sql/londiste/expected/londiste_merge.out
+++ b/sql/londiste/expected/londiste_merge.out
@@ -55,6 +55,24 @@ select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2m
200 | Node "p2merge" initialized for queue "part2_set" with type "leaf"
(1 row)
+select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Location registered
+(1 row)
+
+select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set');
+ ret_code | ret_note
+----------+-------------------------------------------------------------------
+ 200 | Node "p3merge" initialized for queue "part3_set" with type "leaf"
+(1 row)
+
select * from londiste.local_add_table('combined_set', 'tblmerge');
ret_code | ret_note
----------+------------------------------
@@ -139,12 +157,6 @@ select * from londiste.get_table_list('part2_set');
public.tblmerge | t | in-copy | | | | wait-replay | 1
(1 row)
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
- ret_code | ret_note
-----------+----------------------------------------------
- 200 | Table public.tblmerge state set to 'in-copy'
-(1 row)
-
select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
ret_code | ret_note
----------+--------------------------------------------------
@@ -163,16 +175,16 @@ select * from londiste.get_table_list('part2_set');
public.tblmerge | t | catching-up | | | | wait-replay | 0
(1 row)
-select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
- ret_code | ret_note
-----------+---------------------
- 200 | Table struct stored
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
+ ret_code | ret_note
+----------+--------------------------------------------------
+ 200 | Table public.tblmerge state set to 'catching-up'
(1 row)
select * from londiste.get_table_list('part1_set');
- table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
------------------+-------+-------------+-----------------+-------------+-------------+-----------+----------
- public.tblmerge | t | in-copy | | | | lead | 0
+ table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos
+-----------------+-------+-------------+-----------------+-------------+---------------+-----------+----------
+ public.tblmerge | t | catching-up | | | create index; | | 0
(1 row)
select * from londiste.get_table_list('part2_set');
@@ -181,16 +193,10 @@ select * from londiste.get_table_list('part2_set');
public.tblmerge | t | catching-up | | | | wait-replay | 0
(1 row)
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
- ret_code | ret_note
-----------+--------------------------------------------------
- 200 | Table public.tblmerge state set to 'catching-up'
-(1 row)
-
-select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
- ret_code | ret_note
-----------+--------------------------------------------------
- 200 | Table public.tblmerge state set to 'catching-up'
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+ ret_code | ret_note
+----------+---------------------
+ 200 | Table struct stored
(1 row)
select * from londiste.get_table_list('part1_set');
@@ -205,3 +211,50 @@ select * from londiste.get_table_list('part2_set');
public.tblmerge | t | catching-up | | | | | 0
(1 row)
+--
+-- Test all combinations on 3-table merge
+--
+select * from londiste.global_add_table('part3_set', 'tblmerge');
+ ret_code | ret_note
+----------+-----------------------
+ 200 | Table added: tblmerge
+(1 row)
+
+\set ECHO off
+select * from testmatrix();
+ p1s | p2s | p3s | p1r | p2r | p3r
+--------------+--------------+--------------+-------------+-------------+-------------
+ !catching-up | catching-up | catching-up | NULL | wait-replay | wait-replay
+ !catching-up | catching-up | in-copy | wait-replay | wait-replay | wait-replay
+ !catching-up | in-copy | catching-up | wait-replay | wait-replay | wait-replay
+ !catching-up | in-copy | in-copy | wait-replay | wait-replay | wait-replay
+ !in-copy | catching-up | catching-up | lead | wait-replay | wait-replay
+ !in-copy | catching-up | in-copy | lead | wait-replay | wait-replay
+ !in-copy | in-copy | catching-up | lead | wait-replay | wait-replay
+ !in-copy | in-copy | in-copy | lead | wait-replay | wait-replay
+ catching-up | !catching-up | catching-up | wait-replay | NULL | wait-replay
+ catching-up | !catching-up | in-copy | wait-replay | wait-replay | wait-replay
+ catching-up | !in-copy | catching-up | wait-replay | lead | wait-replay
+ catching-up | !in-copy | in-copy | wait-replay | lead | wait-replay
+ catching-up | catching-up | !catching-up | wait-replay | wait-replay | NULL
+ catching-up | catching-up | !in-copy | wait-replay | wait-replay | lead
+ catching-up | catching-up | catching-up | NULL | NULL | NULL
+ catching-up | catching-up | in-copy | NULL | NULL | wait-replay
+ catching-up | in-copy | !catching-up | wait-replay | wait-replay | wait-replay
+ catching-up | in-copy | !in-copy | wait-replay | wait-replay | lead
+ catching-up | in-copy | catching-up | NULL | wait-replay | NULL
+ catching-up | in-copy | in-copy | NULL | wait-replay | wait-replay
+ in-copy | !catching-up | catching-up | wait-replay | wait-replay | wait-replay
+ in-copy | !catching-up | in-copy | wait-replay | wait-replay | wait-replay
+ in-copy | !in-copy | catching-up | wait-replay | lead | wait-replay
+ in-copy | !in-copy | in-copy | wait-replay | lead | wait-replay
+ in-copy | catching-up | !catching-up | wait-replay | wait-replay | wait-replay
+ in-copy | catching-up | !in-copy | wait-replay | wait-replay | lead
+ in-copy | catching-up | catching-up | wait-replay | NULL | NULL
+ in-copy | catching-up | in-copy | wait-replay | NULL | wait-replay
+ in-copy | in-copy | !catching-up | wait-replay | wait-replay | wait-replay
+ in-copy | in-copy | !in-copy | wait-replay | wait-replay | lead
+ in-copy | in-copy | catching-up | wait-replay | wait-replay | NULL
+ in-copy | in-copy | in-copy | lead | wait-copy | wait-copy
+(32 rows)
+
diff --git a/sql/londiste/functions/londiste.get_table_list.sql b/sql/londiste/functions/londiste.get_table_list.sql
index b998ed22..37a02ad8 100644
--- a/sql/londiste/functions/londiste.get_table_list.sql
+++ b/sql/londiste/functions/londiste.get_table_list.sql
@@ -32,9 +32,8 @@ returns setof record as $$
--
-- copy_role = lead:
-- on copy start, drop indexes and store in dropped_ddl
--- on copy finish wait, until copy_role turns to NULL
--- if dropped_ddl not NULL, restore them
--- tag as catching-up
+-- on copy finish change state to catching-up, then wait until copy_role turns to NULL
+-- catching-up: if dropped_ddl not NULL, restore them
-- copy_role = wait-copy:
-- on copy start wait, until role changes (to wait-replay)
-- copy_role = wait-replay:
@@ -43,14 +42,18 @@ returns setof record as $$
--
declare
q_part1 text;
+ q_part_ddl text;
n_parts int4;
n_done int4;
- var_table_name text;
+ v_table_name text;
n_combined_queue text;
begin
- for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in
+ for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl,
+ q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos
+ in
select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl,
min(case when t2.local then t2.queue_name else null end) as _queue1,
+ min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl,
count(case when t2.local then t2.table_name else null end) as _total,
count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
min(n.combined_queue) as _combined_queue,
@@ -69,37 +72,52 @@ begin
-- the copy processes need coordination
copy_role := null;
+ -- be more robust against late joiners
+ q_part1 := coalesce(q_part_ddl, q_part1);
+
if q_part1 is not null then
if i_queue_name = q_part1 then
-- lead
- if merge_state in ('in-copy', 'catching-up') then
+ if merge_state = 'in-copy' then
+ if dropped_ddl is null and n_done > 0 then
+ -- seems late addition, let it copy with indexes
+ copy_role := 'wait-replay';
+ elsif n_done < n_parts then
+ -- show copy_role only if need to drop ddl or already did drop ddl
+ copy_role := 'lead';
+ end if;
+
+ -- make sure it cannot be made to wait
+ copy_pos := 0;
+ end if;
+ if merge_state = 'catching-up' and dropped_ddl is not null then
-- show copy_role only if need to wait for others
if n_done < n_parts then
- copy_role := 'lead';
+ copy_role := 'wait-replay';
end if;
end if;
else
-- follow
if merge_state = 'in-copy' then
- -- has lead already dropped ddl?
- perform 1 from londiste.table_info t
- where t.queue_name = q_part1
- and t.table_name = var_table_name
- and t.dropped_ddl is not null;
- if found then
+ if q_part_ddl is not null then
+ -- can copy, wait in replay until lead has applied ddl
+ copy_role := 'wait-replay';
+ elsif n_done > 0 then
+ -- ddl is not dropped, others are active, copy without touching ddl
copy_role := 'wait-replay';
else
+ -- wait for lead to drop ddl
copy_role := 'wait-copy';
end if;
elsif merge_state = 'catching-up' then
-- show copy_role only if need to wait for lead
- if n_done < n_parts then
+ if q_part_ddl is not null then
copy_role := 'wait-replay';
end if;
end if;
end if;
end if;
- table_name:=var_table_name;
+ table_name := v_table_name;
return next;
end loop;
return;
diff --git a/sql/londiste/sql/londiste_merge.sql b/sql/londiste/sql/londiste_merge.sql
index d226a585..dcfe7871 100644
--- a/sql/londiste/sql/londiste_merge.sql
+++ b/sql/londiste/sql/londiste_merge.sql
@@ -21,6 +21,10 @@ select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', fal
select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false);
select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set');
+select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false);
+select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false);
+select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set');
+
select * from londiste.local_add_table('combined_set', 'tblmerge');
@@ -43,26 +47,96 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'c
select * from londiste.get_table_list('part1_set');
select * from londiste.get_table_list('part2_set');
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy');
select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
select * from londiste.get_table_list('part1_set');
select * from londiste.get_table_list('part2_set');
-select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
+select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
select * from londiste.get_table_list('part1_set');
select * from londiste.get_table_list('part2_set');
-select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up');
-select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up');
+select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null);
select * from londiste.get_table_list('part1_set');
select * from londiste.get_table_list('part2_set');
+--
+-- Test all combinations on 3-table merge
+--
-
-
-
-
-
+select * from londiste.global_add_table('part3_set', 'tblmerge');
+
+\set ECHO off
+
+create table states ( state text );
+insert into states values ('in-copy');
+insert into states values ('!in-copy');
+insert into states values ('catching-up');
+insert into states values ('!catching-up');
+
+create or replace function testmerge(
+ in p1state text, in p2state text, in p3state text,
+ out p1res text, out p2res text, out p3res text)
+as $$
+declare
+ p1ddl text;
+ p2ddl text;
+ p3ddl text;
+ tbl text = 'public.tblmerge';
+begin
+ if position('!' in p1state) > 0 then
+ p1state := replace(p1state, '!', '');
+ p1ddl = 'x';
+ end if;
+ if position('!' in p2state) > 0 then
+ p2state := replace(p2state, '!', '');
+ p2ddl = 'x';
+ end if;
+ if position('!' in p3state) > 0 then
+ p3state := replace(p3state, '!', '');
+ p3ddl = 'x';
+ end if;
+
+ update londiste.table_info
+ set merge_state = p1state, dropped_ddl = p1ddl, local = true
+ where table_name = tbl and queue_name = 'part1_set';
+ update londiste.table_info
+ set merge_state = p2state, dropped_ddl = p2ddl, local = true
+ where table_name = tbl and queue_name = 'part2_set';
+ update londiste.table_info
+ set merge_state = p3state, dropped_ddl = p3ddl, local = true
+ where table_name = tbl and queue_name = 'part3_set';
+
+ select coalesce(copy_role, 'NULL') from londiste.get_table_list('part1_set')
+ where table_name = tbl into p1res;
+ select coalesce(copy_role, 'NULL') from londiste.get_table_list('part2_set')
+ where table_name = tbl into p2res;
+ select coalesce(copy_role, 'NULL') from londiste.get_table_list('part3_set')
+ where table_name = tbl into p3res;
+ return;
+end;
+$$ language plpgsql;
+
+create function testmatrix(
+ out p1s text, out p2s text, out p3s text,
+ out p1r text, out p2r text, out p3r text)
+returns setof record as $$
+begin
+ for p1s, p2s, p3s in
+ select p1.state, p2.state, p3.state
+ from states p1, states p2, states p3
+ where position('!' in p1.state) + position('!' in p2.state) + position('!' in p3.state) < 2
+ order by 1,2,3
+ loop
+ select * from testmerge(p1s, p2s, p3s) into p1r, p2r, p3r;
+ return next;
+ end loop;
+ return;
+end;
+$$ language plpgsql;
+
+\set ECHO all
+
+select * from testmatrix();