create or replace function pgq.ticker(i_queue_name text, i_tick_id bigint, i_orig_timestamp timestamptz, i_event_seq bigint) returns bigint as $$ -- ---------------------------------------------------------------------- -- Function: pgq.ticker(3) -- -- External ticker: Insert a tick with a particular tick_id and timestamp. -- -- Parameters: -- i_queue_name - Name of the queue -- i_tick_id - Id of new tick. -- -- Returns: -- Tick id. -- ---------------------------------------------------------------------- begin insert into pgq.tick (tick_queue, tick_id, tick_time, tick_event_seq) select queue_id, i_tick_id, i_orig_timestamp, i_event_seq from pgq.queue where queue_name = i_queue_name and queue_external_ticker and not queue_ticker_paused; if not found then raise exception 'queue not found or ticker disabled: %', i_queue_name; end if; -- make sure seqs stay current perform pgq.seq_setval(queue_tick_seq, i_tick_id), pgq.seq_setval(queue_event_seq, i_event_seq) from pgq.queue where queue_name = i_queue_name; return i_tick_id; end; $$ language plpgsql security definer; -- unsure about access create or replace function pgq.ticker(i_queue_name text) returns bigint as $$ -- ---------------------------------------------------------------------- -- Function: pgq.ticker(1) -- -- Check if tick is needed for the queue and insert it. -- -- For pgqadm usage. -- -- Parameters: -- i_queue_name - Name of the queue -- -- Returns: -- Tick id or NULL if no tick was done. -- ---------------------------------------------------------------------- declare res bigint; q record; state record; last2 record; begin select queue_id, queue_tick_seq, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, queue_event_seq, pgq.seq_getval(queue_event_seq) as event_seq, queue_ticker_paused into q from pgq.queue where queue_name = i_queue_name; if not found then raise exception 'no such queue'; end if; if q.queue_external_ticker then raise exception 'This queue has external tick source.'; end if; if q.queue_ticker_paused then raise exception 'Ticker has been paused for this queue'; end if; -- load state from last tick select now() - tick_time as lag, q.event_seq - tick_event_seq as new_events, tick_id, tick_time, tick_event_seq, txid_snapshot_xmax(tick_snapshot) as sxmax, txid_snapshot_xmin(tick_snapshot) as sxmin into state from pgq.tick where tick_queue = q.queue_id order by tick_queue desc, tick_id desc limit 1; if found then if state.sxmin > txid_current() then raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%', state.sxmin, state.sxmax, txid_current(); end if; if state.new_events < 0 then raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq; end if; if state.sxmax > txid_current() then raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current(); end if; if state.new_events > 0 then -- there are new events, should we wait a bit? if state.new_events < q.queue_ticker_max_count and state.lag < q.queue_ticker_max_lag then return NULL; end if; else -- no new events, should we apply idle period? -- check previous event from the last one. select state.tick_time - tick_time as lag into last2 from pgq.tick where tick_queue = q.queue_id and tick_id < state.tick_id order by tick_queue desc, tick_id desc limit 1; if found then -- gradually decrease the tick frequency if (state.lag < q.queue_ticker_max_lag / 2) or (state.lag < last2.lag * 2 and state.lag < q.queue_ticker_idle_period) then return NULL; end if; end if; end if; end if; insert into pgq.tick (tick_queue, tick_id, tick_event_seq) values (q.queue_id, nextval(q.queue_tick_seq), q.event_seq); return currval(q.queue_tick_seq); end; $$ language plpgsql security definer; -- unsure about access create or replace function pgq.ticker() returns bigint as $$ -- ---------------------------------------------------------------------- -- Function: pgq.ticker(0) -- -- Creates ticks for all unpaused queues which dont have external ticker. -- -- Returns: -- Number of queues that were processed. -- ---------------------------------------------------------------------- declare res bigint; q record; begin res := 0; for q in select queue_name from pgq.queue where not queue_external_ticker and not queue_ticker_paused order by queue_name loop if pgq.ticker(q.queue_name) > 0 then res := res + 1; end if; end loop; return res; end; $$ language plpgsql security definer;