summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/londiste/functions/londiste.global_add_table.sql5
-rw-r--r--sql/londiste/functions/londiste.global_remove_table.sql6
-rw-r--r--sql/londiste/functions/londiste.root_check_seqs.sql2
-rw-r--r--sql/pgq_node/functions/pgq_node.set_consumer_completed.sql3
-rw-r--r--sql/pgq_node/functions/pgq_node.set_global_watermark.sql3
-rw-r--r--sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql12
6 files changed, 23 insertions, 8 deletions
diff --git a/sql/londiste/functions/londiste.global_add_table.sql b/sql/londiste/functions/londiste.global_add_table.sql
index 40b22d62..e64315cf 100644
--- a/sql/londiste/functions/londiste.global_add_table.sql
+++ b/sql/londiste/functions/londiste.global_add_table.sql
@@ -34,13 +34,14 @@ begin
perform 1 from londiste.table_info where queue_name = i_queue_name and table_name = fq_table_name;
if found then
- select 200, 'OK, already added: ' || fq_table_name into ret_code, ret_note;
+ select 200, 'Table already added: ' || fq_table_name into ret_code, ret_note;
return;
end if;
insert into londiste.table_info (queue_name, table_name)
values (i_queue_name, fq_table_name);
- select 200, 'OK' into ret_code, ret_note;
+ select 200, 'Table added: ' || i_table_name
+ into ret_code, ret_note;
return;
end;
$$ language plpgsql strict;
diff --git a/sql/londiste/functions/londiste.global_remove_table.sql b/sql/londiste/functions/londiste.global_remove_table.sql
index dd473b40..cc806598 100644
--- a/sql/londiste/functions/londiste.global_remove_table.sql
+++ b/sql/londiste/functions/londiste.global_remove_table.sql
@@ -29,10 +29,12 @@ begin
where queue_name = i_queue_name
and table_name = fq_table_name;
if not found then
- select 400, 'Not found: '||fq_table_name into ret_code, ret_note;
+ select 400, 'Table not found: ' || fq_table_name
+ into ret_code, ret_note;
return;
end if;
- select 200, 'OK' into ret_code, ret_note;
+ select 200, 'Table removed: ' || i_table_name
+ into ret_code, ret_note;
return;
end;
$$ language plpgsql strict;
diff --git a/sql/londiste/functions/londiste.root_check_seqs.sql b/sql/londiste/functions/londiste.root_check_seqs.sql
index 81be6aa3..6170ff02 100644
--- a/sql/londiste/functions/londiste.root_check_seqs.sql
+++ b/sql/londiste/functions/londiste.root_check_seqs.sql
@@ -61,7 +61,7 @@ begin
end if;
end loop;
- select 200, 'Sequences updated' into ret_code, ret_note;
+ select 100, 'Sequences updated' into ret_code, ret_note;
return;
end;
$$ language plpgsql;
diff --git a/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql b/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql
index b3da7730..6bcbf535 100644
--- a/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql
+++ b/sql/pgq_node/functions/pgq_node.set_consumer_completed.sql
@@ -23,7 +23,8 @@ begin
where queue_name = i_queue_name
and consumer_name = i_consumer_name;
if found then
- select 200, 'Ok' into ret_code, ret_note;
+ select 100, 'Consumer ' || i_consumer_name || ' compleded tick = ' || i_tick_id
+ into ret_code, ret_note;
else
select 404, 'Consumer not known: '
|| i_queue_name || '/' || i_consumer_name
diff --git a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql
index 1f271773..b422a86b 100644
--- a/sql/pgq_node/functions/pgq_node.set_global_watermark.sql
+++ b/sql/pgq_node/functions/pgq_node.set_global_watermark.sql
@@ -66,7 +66,8 @@ begin
and consumer_name = this.worker_name;
end if;
- select 200, 'Ok' into ret_code, ret_note;
+ select 200, 'Global watermark set to ' || _wm
+ into ret_code, ret_note;
return;
end;
$$ language plpgsql security definer;
diff --git a/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql b/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql
index 6e1aa35d..b492e9ea 100644
--- a/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql
+++ b/sql/pgq_node/functions/pgq_node.set_subscriber_watermark.sql
@@ -33,10 +33,20 @@ begin
end if;
-- todo: check if wm sane?
+ if i_watermark < n.last_tick then
+ select 405, 'watermark must not be moved backwards'
+ into ret_code, ret_note;
+ return;
+ elsif i_watermark = n.last_tick then
+ select 100, 'watermark already set'
+ into ret_code, ret_note;
+ return;
+ end if;
perform pgq.register_consumer_at(i_queue_name, wm_name, i_watermark);
- select 200, 'Ok' into ret_code, ret_note;
+ select 200, wm_name || ' set to ' || i_watermark
+ into ret_code, ret_note;
return;
end;
$$ language plpgsql security definer;