diff options
-rw-r--r-- | sql/londiste/expected/londiste_merge.out | 99 | ||||
-rw-r--r-- | sql/londiste/functions/londiste.get_table_list.sql | 48 | ||||
-rw-r--r-- | sql/londiste/sql/londiste_merge.sql | 92 |
3 files changed, 192 insertions, 47 deletions
diff --git a/sql/londiste/expected/londiste_merge.out b/sql/londiste/expected/londiste_merge.out index 238d343d..27bca55d 100644 --- a/sql/londiste/expected/londiste_merge.out +++ b/sql/londiste/expected/londiste_merge.out @@ -55,6 +55,24 @@ select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2m 200 | Node "p2merge" initialized for queue "part2_set" with type "leaf" (1 row) +select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set'); + ret_code | ret_note +----------+------------------------------------------------------------------- + 200 | Node "p3merge" initialized for queue "part3_set" with type "leaf" +(1 row) + select * from londiste.local_add_table('combined_set', 'tblmerge'); ret_code | ret_note ----------+------------------------------ @@ -139,12 +157,6 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | in-copy | | | | wait-replay | 1 (1 row) -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy'); - ret_code | ret_note -----------+---------------------------------------------- - 200 | Table public.tblmerge state set to 'in-copy' -(1 row) - select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); ret_code | ret_note ----------+-------------------------------------------------- @@ -163,16 +175,16 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | wait-replay | 0 (1 row) -select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); - ret_code | ret_note -----------+--------------------- - 200 | Table struct stored +select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); + ret_code | ret_note +----------+-------------------------------------------------- + 200 | Table public.tblmerge state set to 'catching-up' (1 row) select * from londiste.get_table_list('part1_set'); - table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos ------------------+-------+-------------+-----------------+-------------+-------------+-----------+---------- - public.tblmerge | t | in-copy | | | | lead | 0 + table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos +-----------------+-------+-------------+-----------------+-------------+---------------+-----------+---------- + public.tblmerge | t | catching-up | | | create index; | | 0 (1 row) select * from londiste.get_table_list('part2_set'); @@ -181,16 +193,10 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | wait-replay | 0 (1 row) -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); - ret_code | ret_note -----------+-------------------------------------------------- - 200 | Table public.tblmerge state set to 'catching-up' -(1 row) - -select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); - ret_code | ret_note -----------+-------------------------------------------------- - 200 | Table public.tblmerge state set to 'catching-up' +select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); + ret_code | ret_note +----------+--------------------- + 200 | Table struct stored (1 row) select * from londiste.get_table_list('part1_set'); @@ -205,3 +211,50 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | | 0 (1 row) +-- +-- Test all combinations on 3-table merge +-- +select * from londiste.global_add_table('part3_set', 'tblmerge'); + ret_code | ret_note +----------+----------------------- + 200 | Table added: tblmerge +(1 row) + +\set ECHO off +select * from testmatrix(); + p1s | p2s | p3s | p1r | p2r | p3r +--------------+--------------+--------------+-------------+-------------+------------- + !catching-up | catching-up | catching-up | NULL | wait-replay | wait-replay + !catching-up | catching-up | in-copy | wait-replay | wait-replay | wait-replay + !catching-up | in-copy | catching-up | wait-replay | wait-replay | wait-replay + !catching-up | in-copy | in-copy | wait-replay | wait-replay | wait-replay + !in-copy | catching-up | catching-up | lead | wait-replay | wait-replay + !in-copy | catching-up | in-copy | lead | wait-replay | wait-replay + !in-copy | in-copy | catching-up | lead | wait-replay | wait-replay + !in-copy | in-copy | in-copy | lead | wait-replay | wait-replay + catching-up | !catching-up | catching-up | wait-replay | NULL | wait-replay + catching-up | !catching-up | in-copy | wait-replay | wait-replay | wait-replay + catching-up | !in-copy | catching-up | wait-replay | lead | wait-replay + catching-up | !in-copy | in-copy | wait-replay | lead | wait-replay + catching-up | catching-up | !catching-up | wait-replay | wait-replay | NULL + catching-up | catching-up | !in-copy | wait-replay | wait-replay | lead + catching-up | catching-up | catching-up | NULL | NULL | NULL + catching-up | catching-up | in-copy | NULL | NULL | wait-replay + catching-up | in-copy | !catching-up | wait-replay | wait-replay | wait-replay + catching-up | in-copy | !in-copy | wait-replay | wait-replay | lead + catching-up | in-copy | catching-up | NULL | wait-replay | NULL + catching-up | in-copy | in-copy | NULL | wait-replay | wait-replay + in-copy | !catching-up | catching-up | wait-replay | wait-replay | wait-replay + in-copy | !catching-up | in-copy | wait-replay | wait-replay | wait-replay + in-copy | !in-copy | catching-up | wait-replay | lead | wait-replay + in-copy | !in-copy | in-copy | wait-replay | lead | wait-replay + in-copy | catching-up | !catching-up | wait-replay | wait-replay | wait-replay + in-copy | catching-up | !in-copy | wait-replay | wait-replay | lead + in-copy | catching-up | catching-up | wait-replay | NULL | NULL + in-copy | catching-up | in-copy | wait-replay | NULL | wait-replay + in-copy | in-copy | !catching-up | wait-replay | wait-replay | wait-replay + in-copy | in-copy | !in-copy | wait-replay | wait-replay | lead + in-copy | in-copy | catching-up | wait-replay | wait-replay | NULL + in-copy | in-copy | in-copy | lead | wait-copy | wait-copy +(32 rows) + diff --git a/sql/londiste/functions/londiste.get_table_list.sql b/sql/londiste/functions/londiste.get_table_list.sql index b998ed22..37a02ad8 100644 --- a/sql/londiste/functions/londiste.get_table_list.sql +++ b/sql/londiste/functions/londiste.get_table_list.sql @@ -32,9 +32,8 @@ returns setof record as $$ -- -- copy_role = lead: -- on copy start, drop indexes and store in dropped_ddl --- on copy finish wait, until copy_role turns to NULL --- if dropped_ddl not NULL, restore them --- tag as catching-up +-- on copy finish change state to catching-up, then wait until copy_role turns to NULL +-- catching-up: if dropped_ddl not NULL, restore them -- copy_role = wait-copy: -- on copy start wait, until role changes (to wait-replay) -- copy_role = wait-replay: @@ -43,14 +42,18 @@ returns setof record as $$ -- declare q_part1 text; + q_part_ddl text; n_parts int4; n_done int4; - var_table_name text; + v_table_name text; n_combined_queue text; begin - for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in + for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, + q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos + in select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, min(case when t2.local then t2.queue_name else null end) as _queue1, + min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl, count(case when t2.local then t2.table_name else null end) as _total, count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done, min(n.combined_queue) as _combined_queue, @@ -69,37 +72,52 @@ begin -- the copy processes need coordination copy_role := null; + -- be more robust against late joiners + q_part1 := coalesce(q_part_ddl, q_part1); + if q_part1 is not null then if i_queue_name = q_part1 then -- lead - if merge_state in ('in-copy', 'catching-up') then + if merge_state = 'in-copy' then + if dropped_ddl is null and n_done > 0 then + -- seems late addition, let it copy with indexes + copy_role := 'wait-replay'; + elsif n_done < n_parts then + -- show copy_role only if need to drop ddl or already did drop ddl + copy_role := 'lead'; + end if; + + -- make sure it cannot be made to wait + copy_pos := 0; + end if; + if merge_state = 'catching-up' and dropped_ddl is not null then -- show copy_role only if need to wait for others if n_done < n_parts then - copy_role := 'lead'; + copy_role := 'wait-replay'; end if; end if; else -- follow if merge_state = 'in-copy' then - -- has lead already dropped ddl? - perform 1 from londiste.table_info t - where t.queue_name = q_part1 - and t.table_name = var_table_name - and t.dropped_ddl is not null; - if found then + if q_part_ddl is not null then + -- can copy, wait in replay until lead has applied ddl + copy_role := 'wait-replay'; + elsif n_done > 0 then + -- ddl is not dropped, others are active, copy without touching ddl copy_role := 'wait-replay'; else + -- wait for lead to drop ddl copy_role := 'wait-copy'; end if; elsif merge_state = 'catching-up' then -- show copy_role only if need to wait for lead - if n_done < n_parts then + if q_part_ddl is not null then copy_role := 'wait-replay'; end if; end if; end if; end if; - table_name:=var_table_name; + table_name := v_table_name; return next; end loop; return; diff --git a/sql/londiste/sql/londiste_merge.sql b/sql/londiste/sql/londiste_merge.sql index d226a585..dcfe7871 100644 --- a/sql/londiste/sql/londiste_merge.sql +++ b/sql/londiste/sql/londiste_merge.sql @@ -21,6 +21,10 @@ select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', fal select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false); select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set'); +select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false); +select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false); +select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set'); + select * from londiste.local_add_table('combined_set', 'tblmerge'); @@ -43,26 +47,96 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'c select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy'); select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); +select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); -select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); +select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); +-- +-- Test all combinations on 3-table merge +-- - - - - - +select * from londiste.global_add_table('part3_set', 'tblmerge'); + +\set ECHO off + +create table states ( state text ); +insert into states values ('in-copy'); +insert into states values ('!in-copy'); +insert into states values ('catching-up'); +insert into states values ('!catching-up'); + +create or replace function testmerge( + in p1state text, in p2state text, in p3state text, + out p1res text, out p2res text, out p3res text) +as $$ +declare + p1ddl text; + p2ddl text; + p3ddl text; + tbl text = 'public.tblmerge'; +begin + if position('!' in p1state) > 0 then + p1state := replace(p1state, '!', ''); + p1ddl = 'x'; + end if; + if position('!' in p2state) > 0 then + p2state := replace(p2state, '!', ''); + p2ddl = 'x'; + end if; + if position('!' in p3state) > 0 then + p3state := replace(p3state, '!', ''); + p3ddl = 'x'; + end if; + + update londiste.table_info + set merge_state = p1state, dropped_ddl = p1ddl, local = true + where table_name = tbl and queue_name = 'part1_set'; + update londiste.table_info + set merge_state = p2state, dropped_ddl = p2ddl, local = true + where table_name = tbl and queue_name = 'part2_set'; + update londiste.table_info + set merge_state = p3state, dropped_ddl = p3ddl, local = true + where table_name = tbl and queue_name = 'part3_set'; + + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part1_set') + where table_name = tbl into p1res; + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part2_set') + where table_name = tbl into p2res; + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part3_set') + where table_name = tbl into p3res; + return; +end; +$$ language plpgsql; + +create function testmatrix( + out p1s text, out p2s text, out p3s text, + out p1r text, out p2r text, out p3r text) +returns setof record as $$ +begin + for p1s, p2s, p3s in + select p1.state, p2.state, p3.state + from states p1, states p2, states p3 + where position('!' in p1.state) + position('!' in p2.state) + position('!' in p3.state) < 2 + order by 1,2,3 + loop + select * from testmerge(p1s, p2s, p3s) into p1r, p2r, p3r; + return next; + end loop; + return; +end; +$$ language plpgsql; + +\set ECHO all + +select * from testmatrix(); |