diff options
author | Marko Kreen | 2011-09-18 20:24:36 +0000 |
---|---|---|
committer | Marko Kreen | 2011-09-18 20:24:36 +0000 |
commit | 496c13bb313f992130a303d1f5efd6a76e51a4b8 (patch) | |
tree | a86526fabf7fca9f7cdca76715ad8e623f254f9a | |
parent | 3be9aecc644c15c7335306bcd54690cde105f7c2 (diff) |
londiste.get_table_list: make sure all state combinations work
Although we may not support all combinations from UI side,
it does not guarantee that some combination cannot happen,
so try to give reasonable roles in any case.
New logic:
- if table has ddl, it's lead
- in case of no dll and at least one complete copy, let the copy be follower:
that means it will do copy into table without touching ddl.
-rw-r--r-- | sql/londiste/expected/londiste_merge.out | 99 | ||||
-rw-r--r-- | sql/londiste/functions/londiste.get_table_list.sql | 48 | ||||
-rw-r--r-- | sql/londiste/sql/londiste_merge.sql | 92 |
3 files changed, 192 insertions, 47 deletions
diff --git a/sql/londiste/expected/londiste_merge.out b/sql/londiste/expected/londiste_merge.out index 238d343d..27bca55d 100644 --- a/sql/londiste/expected/londiste_merge.out +++ b/sql/londiste/expected/londiste_merge.out @@ -55,6 +55,24 @@ select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2m 200 | Node "p2merge" initialized for queue "part2_set" with type "leaf" (1 row) +select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false); + ret_code | ret_note +----------+--------------------- + 200 | Location registered +(1 row) + +select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set'); + ret_code | ret_note +----------+------------------------------------------------------------------- + 200 | Node "p3merge" initialized for queue "part3_set" with type "leaf" +(1 row) + select * from londiste.local_add_table('combined_set', 'tblmerge'); ret_code | ret_note ----------+------------------------------ @@ -139,12 +157,6 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | in-copy | | | | wait-replay | 1 (1 row) -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy'); - ret_code | ret_note -----------+---------------------------------------------- - 200 | Table public.tblmerge state set to 'in-copy' -(1 row) - select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); ret_code | ret_note ----------+-------------------------------------------------- @@ -163,16 +175,16 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | wait-replay | 0 (1 row) -select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); - ret_code | ret_note -----------+--------------------- - 200 | Table struct stored +select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); + ret_code | ret_note +----------+-------------------------------------------------- + 200 | Table public.tblmerge state set to 'catching-up' (1 row) select * from londiste.get_table_list('part1_set'); - table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos ------------------+-------+-------------+-----------------+-------------+-------------+-----------+---------- - public.tblmerge | t | in-copy | | | | lead | 0 + table_name | local | merge_state | custom_snapshot | table_attrs | dropped_ddl | copy_role | copy_pos +-----------------+-------+-------------+-----------------+-------------+---------------+-----------+---------- + public.tblmerge | t | catching-up | | | create index; | | 0 (1 row) select * from londiste.get_table_list('part2_set'); @@ -181,16 +193,10 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | wait-replay | 0 (1 row) -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); - ret_code | ret_note -----------+-------------------------------------------------- - 200 | Table public.tblmerge state set to 'catching-up' -(1 row) - -select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); - ret_code | ret_note -----------+-------------------------------------------------- - 200 | Table public.tblmerge state set to 'catching-up' +select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); + ret_code | ret_note +----------+--------------------- + 200 | Table struct stored (1 row) select * from londiste.get_table_list('part1_set'); @@ -205,3 +211,50 @@ select * from londiste.get_table_list('part2_set'); public.tblmerge | t | catching-up | | | | | 0 (1 row) +-- +-- Test all combinations on 3-table merge +-- +select * from londiste.global_add_table('part3_set', 'tblmerge'); + ret_code | ret_note +----------+----------------------- + 200 | Table added: tblmerge +(1 row) + +\set ECHO off +select * from testmatrix(); + p1s | p2s | p3s | p1r | p2r | p3r +--------------+--------------+--------------+-------------+-------------+------------- + !catching-up | catching-up | catching-up | NULL | wait-replay | wait-replay + !catching-up | catching-up | in-copy | wait-replay | wait-replay | wait-replay + !catching-up | in-copy | catching-up | wait-replay | wait-replay | wait-replay + !catching-up | in-copy | in-copy | wait-replay | wait-replay | wait-replay + !in-copy | catching-up | catching-up | lead | wait-replay | wait-replay + !in-copy | catching-up | in-copy | lead | wait-replay | wait-replay + !in-copy | in-copy | catching-up | lead | wait-replay | wait-replay + !in-copy | in-copy | in-copy | lead | wait-replay | wait-replay + catching-up | !catching-up | catching-up | wait-replay | NULL | wait-replay + catching-up | !catching-up | in-copy | wait-replay | wait-replay | wait-replay + catching-up | !in-copy | catching-up | wait-replay | lead | wait-replay + catching-up | !in-copy | in-copy | wait-replay | lead | wait-replay + catching-up | catching-up | !catching-up | wait-replay | wait-replay | NULL + catching-up | catching-up | !in-copy | wait-replay | wait-replay | lead + catching-up | catching-up | catching-up | NULL | NULL | NULL + catching-up | catching-up | in-copy | NULL | NULL | wait-replay + catching-up | in-copy | !catching-up | wait-replay | wait-replay | wait-replay + catching-up | in-copy | !in-copy | wait-replay | wait-replay | lead + catching-up | in-copy | catching-up | NULL | wait-replay | NULL + catching-up | in-copy | in-copy | NULL | wait-replay | wait-replay + in-copy | !catching-up | catching-up | wait-replay | wait-replay | wait-replay + in-copy | !catching-up | in-copy | wait-replay | wait-replay | wait-replay + in-copy | !in-copy | catching-up | wait-replay | lead | wait-replay + in-copy | !in-copy | in-copy | wait-replay | lead | wait-replay + in-copy | catching-up | !catching-up | wait-replay | wait-replay | wait-replay + in-copy | catching-up | !in-copy | wait-replay | wait-replay | lead + in-copy | catching-up | catching-up | wait-replay | NULL | NULL + in-copy | catching-up | in-copy | wait-replay | NULL | wait-replay + in-copy | in-copy | !catching-up | wait-replay | wait-replay | wait-replay + in-copy | in-copy | !in-copy | wait-replay | wait-replay | lead + in-copy | in-copy | catching-up | wait-replay | wait-replay | NULL + in-copy | in-copy | in-copy | lead | wait-copy | wait-copy +(32 rows) + diff --git a/sql/londiste/functions/londiste.get_table_list.sql b/sql/londiste/functions/londiste.get_table_list.sql index b998ed22..37a02ad8 100644 --- a/sql/londiste/functions/londiste.get_table_list.sql +++ b/sql/londiste/functions/londiste.get_table_list.sql @@ -32,9 +32,8 @@ returns setof record as $$ -- -- copy_role = lead: -- on copy start, drop indexes and store in dropped_ddl --- on copy finish wait, until copy_role turns to NULL --- if dropped_ddl not NULL, restore them --- tag as catching-up +-- on copy finish change state to catching-up, then wait until copy_role turns to NULL +-- catching-up: if dropped_ddl not NULL, restore them -- copy_role = wait-copy: -- on copy start wait, until role changes (to wait-replay) -- copy_role = wait-replay: @@ -43,14 +42,18 @@ returns setof record as $$ -- declare q_part1 text; + q_part_ddl text; n_parts int4; n_done int4; - var_table_name text; + v_table_name text; n_combined_queue text; begin - for var_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, q_part1, n_parts, n_done, n_combined_queue, copy_pos in + for v_table_name, local, merge_state, custom_snapshot, table_attrs, dropped_ddl, + q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos + in select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, min(case when t2.local then t2.queue_name else null end) as _queue1, + min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl, count(case when t2.local then t2.table_name else null end) as _total, count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done, min(n.combined_queue) as _combined_queue, @@ -69,37 +72,52 @@ begin -- the copy processes need coordination copy_role := null; + -- be more robust against late joiners + q_part1 := coalesce(q_part_ddl, q_part1); + if q_part1 is not null then if i_queue_name = q_part1 then -- lead - if merge_state in ('in-copy', 'catching-up') then + if merge_state = 'in-copy' then + if dropped_ddl is null and n_done > 0 then + -- seems late addition, let it copy with indexes + copy_role := 'wait-replay'; + elsif n_done < n_parts then + -- show copy_role only if need to drop ddl or already did drop ddl + copy_role := 'lead'; + end if; + + -- make sure it cannot be made to wait + copy_pos := 0; + end if; + if merge_state = 'catching-up' and dropped_ddl is not null then -- show copy_role only if need to wait for others if n_done < n_parts then - copy_role := 'lead'; + copy_role := 'wait-replay'; end if; end if; else -- follow if merge_state = 'in-copy' then - -- has lead already dropped ddl? - perform 1 from londiste.table_info t - where t.queue_name = q_part1 - and t.table_name = var_table_name - and t.dropped_ddl is not null; - if found then + if q_part_ddl is not null then + -- can copy, wait in replay until lead has applied ddl + copy_role := 'wait-replay'; + elsif n_done > 0 then + -- ddl is not dropped, others are active, copy without touching ddl copy_role := 'wait-replay'; else + -- wait for lead to drop ddl copy_role := 'wait-copy'; end if; elsif merge_state = 'catching-up' then -- show copy_role only if need to wait for lead - if n_done < n_parts then + if q_part_ddl is not null then copy_role := 'wait-replay'; end if; end if; end if; end if; - table_name:=var_table_name; + table_name := v_table_name; return next; end loop; return; diff --git a/sql/londiste/sql/londiste_merge.sql b/sql/londiste/sql/londiste_merge.sql index d226a585..dcfe7871 100644 --- a/sql/londiste/sql/londiste_merge.sql +++ b/sql/londiste/sql/londiste_merge.sql @@ -21,6 +21,10 @@ select * from pgq_node.register_location('part2_set', 'p2root', 'dbname=db', fal select * from pgq_node.register_location('part2_set', 'p2merge', 'dbname=db2', false); select * from pgq_node.create_node('part2_set', 'leaf', 'p2merge', 'londiste_p2merge', 'p2root', 100, 'combined_set'); +select * from pgq_node.register_location('part3_set', 'p3root', 'dbname=db', false); +select * from pgq_node.register_location('part3_set', 'p3merge', 'dbname=db3', false); +select * from pgq_node.create_node('part3_set', 'leaf', 'p3merge', 'londiste_p3merge', 'p3root', 100, 'combined_set'); + select * from londiste.local_add_table('combined_set', 'tblmerge'); @@ -43,26 +47,96 @@ select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', 'c select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'in-copy'); select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); +select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); -select * from londiste.local_set_table_state('part1_set', 'public.tblmerge', null, 'catching-up'); -select * from londiste.local_set_table_state('part2_set', 'public.tblmerge', null, 'catching-up'); +select * from londiste.local_set_table_struct('part1_set', 'public.tblmerge', null); select * from londiste.get_table_list('part1_set'); select * from londiste.get_table_list('part2_set'); +-- +-- Test all combinations on 3-table merge +-- - - - - - +select * from londiste.global_add_table('part3_set', 'tblmerge'); + +\set ECHO off + +create table states ( state text ); +insert into states values ('in-copy'); +insert into states values ('!in-copy'); +insert into states values ('catching-up'); +insert into states values ('!catching-up'); + +create or replace function testmerge( + in p1state text, in p2state text, in p3state text, + out p1res text, out p2res text, out p3res text) +as $$ +declare + p1ddl text; + p2ddl text; + p3ddl text; + tbl text = 'public.tblmerge'; +begin + if position('!' in p1state) > 0 then + p1state := replace(p1state, '!', ''); + p1ddl = 'x'; + end if; + if position('!' in p2state) > 0 then + p2state := replace(p2state, '!', ''); + p2ddl = 'x'; + end if; + if position('!' in p3state) > 0 then + p3state := replace(p3state, '!', ''); + p3ddl = 'x'; + end if; + + update londiste.table_info + set merge_state = p1state, dropped_ddl = p1ddl, local = true + where table_name = tbl and queue_name = 'part1_set'; + update londiste.table_info + set merge_state = p2state, dropped_ddl = p2ddl, local = true + where table_name = tbl and queue_name = 'part2_set'; + update londiste.table_info + set merge_state = p3state, dropped_ddl = p3ddl, local = true + where table_name = tbl and queue_name = 'part3_set'; + + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part1_set') + where table_name = tbl into p1res; + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part2_set') + where table_name = tbl into p2res; + select coalesce(copy_role, 'NULL') from londiste.get_table_list('part3_set') + where table_name = tbl into p3res; + return; +end; +$$ language plpgsql; + +create function testmatrix( + out p1s text, out p2s text, out p3s text, + out p1r text, out p2r text, out p3r text) +returns setof record as $$ +begin + for p1s, p2s, p3s in + select p1.state, p2.state, p3.state + from states p1, states p2, states p3 + where position('!' in p1.state) + position('!' in p2.state) + position('!' in p3.state) < 2 + order by 1,2,3 + loop + select * from testmerge(p1s, p2s, p3s) into p1r, p2r, p3r; + return next; + end loop; + return; +end; +$$ language plpgsql; + +\set ECHO all + +select * from testmatrix(); |