diff options
6 files changed, 23 insertions, 8 deletions
diff --git a/sql/londiste/functions/londiste.global_add_table.sql b/sql/londiste/functions/londiste.global_add_table.sql index 40b22d62..e64315cf 100644 --- a/sql/londiste/functions/londiste.global_add_table.sql +++ b/sql/londiste/functions/londiste.global_add_table.sql @@ -34,13 +34,14 @@ begin perform 1 from londiste.table_info where queue_name = i_queue_name and table_name = fq_table_name; if found then - select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_note; + select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note; return; end if; insert into londiste.table_info (queue_name, table_name) values (i_queue_name, fq_table_name); - select 200, 'OK' into ret_code, ret_note; + select 200, 'Table added: ' || i_table_name + into ret_code, ret_note; return; end; $$ language plpgsql strict; diff --git a/sql/londiste/functions/londiste.global_remove_table.sql b/sql/londiste/functions/londiste.global_remove_table.sql index dd473b40..cc806598 100644 --- a/sql/londiste/functions/londiste.global_remove_table.sql +++ b/sql/londiste/functions/londiste.global_remove_table.sql @@ -29,10 +29,12 @@ begin where queue_name = i_queue_name and table_name = fq_table_name; if not found then - select 400, 'Not found: '||fq_table_name into ret_code, ret_note; + select 400, 'Table not found: ' || fq_table_name + into ret_code, ret_note; return; end if; - select 200, 'OK' into ret_code, ret_note; + select 200, 'Table removed: ' || i_table_name + into ret_code, ret_note; return; end; $$ language plpgsql strict; diff --git a/sql/londiste/functions/londiste.root_check_seqs.sql b/sql/londiste/functions/londiste.root_check_seqs.sql index 81be6aa3..6170ff02 100644 --- a/sql/londiste/functions/londiste.root_check_seqs.sql +++ b/sql/londiste/functions/londiste.root_check_seqs.sql @@ -61,7 +61,7 @@ begin end if; end loop; - select 200, 'Sequences updated' into ret_code, ret_note; + select 100, 'Sequences updated' into ret_code, ret_note; return; end; $$ language plpgsql; diff --git a/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql b/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql index b3da7730..6bcbf535 100644 --- a/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql +++ b/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql @@ -23,7 +23,8 @@ begin where queue_name = i_queue_name and consumer_name = i_consumer_name; if found then - select 200, 'Ok' into ret_code, ret_note; + select 100, 'Consumer ' || i_consumer_name || ' compleded tick = ' || i_tick_id + into ret_code, ret_note; else select 404, 'Consumer not known: ' || i_queue_name || '/' || i_consumer_name diff --git a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql index 1f271773..b422a86b 100644 --- a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql +++ b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql @@ -66,7 +66,8 @@ begin and consumer_name = this.worker_name; end if; - select 200, 'Ok' into ret_code, ret_note; + select 200, 'Global watermark set to ' || _wm + into ret_code, ret_note; return; end; $$ language plpgsql security definer; diff --git a/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql b/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql index 6e1aa35d..b492e9ea 100644 --- a/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql +++ b/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql @@ -33,10 +33,20 @@ begin end if; -- todo: check if wm sane? + if i_watermark < n.last_tick then + select 405, 'watermark must not be moved backwards' + into ret_code, ret_note; + return; + elsif i_watermark = n.last_tick then + select 100, 'watermark already set' + into ret_code, ret_note; + return; + end if; perform pgq.register_consumer_at(i_queue_name, wm_name, i_watermark); - select 200, 'Ok' into ret_code, ret_note; + select 200, wm_name || ' set to ' || i_watermark + into ret_code, ret_note; return; end; $$ language plpgsql security definer; |