declare
tmp text;
last_tick bigint;
- x_queue_id integer;
+ x_queue_id integer;
x_consumer_id integer;
queue integer;
sub record;
-- get consumer and create if new
select co_id into x_consumer_id from pgq.consumer
- where co_name = x_consumer_name;
+ where co_name = x_consumer_name
+ for update;
if not found then
insert into pgq.consumer (co_name) values (x_consumer_name);
x_consumer_id := currval('pgq.consumer_co_id_seq');
-- ----------------------------------------------------------------------
-- Function: pgq.unregister_consumer(2)
--
--- Unsubscriber consumer from the queue. Also consumer's
--- retry events are deleted.
+-- Unsubscribe consumer from the queue.
+-- Also consumer's retry events are deleted.
--
-- Parameters:
-- x_queue_name - Name of the queue
and s.sub_consumer = c.co_id
and q.queue_name = x_queue_name
and c.co_name = x_consumer_name
- for update of s;
+ for update of s, c;
if not found then
return 0;
end if;
delete from pgq.subscription
where sub_id = x_sub_id
and sub_consumer = _consumer_id;
-
return 1;
else
-- delete main consumer (including possible subconsumers)
-- this will drop subconsumers too
delete from pgq.subscription
where sub_id = x_sub_id;
-
+
+ delete from pgq.consumer
+ where co_id = _consumer_id;
+
return _sub_id_cnt;
end if;