1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
create or replace function pgq.ticker(i_queue_name text, i_tick_id bigint, i_orig_timestamp timestamptz, i_event_seq bigint)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(3)
--
-- External ticker: Insert a tick with a particular tick_id and timestamp.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_tick_id - Id of new tick.
--
-- Returns:
-- Tick id.
-- ----------------------------------------------------------------------
begin
insert into pgq.tick (tick_queue, tick_id, tick_time, tick_event_seq)
select queue_id, i_tick_id, i_orig_timestamp, i_event_seq
from pgq.queue
where queue_name = i_queue_name
and queue_external_ticker
and not queue_ticker_paused;
if not found then
raise exception 'queue not found or ticker disabled: %', i_queue_name;
end if;
-- make sure seqs stay current
perform pgq.seq_setval(queue_tick_seq, i_tick_id),
pgq.seq_setval(queue_event_seq, i_event_seq)
from pgq.queue
where queue_name = i_queue_name;
return i_tick_id;
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker(i_queue_name text)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(1)
--
-- Check if tick is needed for the queue and insert it.
--
-- For pgqadm usage.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- Tick id or NULL if no tick was done.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
state record;
last2 record;
begin
select queue_id, queue_tick_seq, queue_external_ticker,
queue_ticker_max_count, queue_ticker_max_lag,
queue_ticker_idle_period, queue_event_seq,
pgq.seq_getval(queue_event_seq) as event_seq,
queue_ticker_paused
into q
from pgq.queue where queue_name = i_queue_name;
if not found then
raise exception 'no such queue';
end if;
if q.queue_external_ticker then
raise exception 'This queue has external tick source.';
end if;
if q.queue_ticker_paused then
raise exception 'Ticker has been paused for this queue';
end if;
-- load state from last tick
select now() - tick_time as lag,
q.event_seq - tick_event_seq as new_events,
tick_id, tick_time, tick_event_seq,
txid_snapshot_xmax(tick_snapshot) as sxmax,
txid_snapshot_xmin(tick_snapshot) as sxmin
into state
from pgq.tick
where tick_queue = q.queue_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
if state.sxmin > txid_current() then
raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%',
state.sxmin, state.sxmax, txid_current();
end if;
if state.new_events < 0 then
raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq;
end if;
if state.sxmax > txid_current() then
raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current();
end if;
if state.new_events > 0 then
-- there are new events, should we wait a bit?
if state.new_events < q.queue_ticker_max_count
and state.lag < q.queue_ticker_max_lag
then
return NULL;
end if;
else
-- no new events, should we apply idle period?
-- check previous event from the last one.
select state.tick_time - tick_time as lag
into last2
from pgq.tick
where tick_queue = q.queue_id
and tick_id < state.tick_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
-- gradually decrease the tick frequency
if (state.lag < q.queue_ticker_max_lag / 2)
or
(state.lag < last2.lag * 2
and state.lag < q.queue_ticker_idle_period)
then
return NULL;
end if;
end if;
end if;
end if;
insert into pgq.tick (tick_queue, tick_id, tick_event_seq)
values (q.queue_id, nextval(q.queue_tick_seq), q.event_seq);
return currval(q.queue_tick_seq);
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker() returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(0)
--
-- Creates ticks for all unpaused queues which dont have external ticker.
--
-- Returns:
-- Number of queues that were processed.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
begin
res := 0;
for q in
select queue_name from pgq.queue
where not queue_external_ticker
and not queue_ticker_paused
order by queue_name
loop
if pgq.ticker(q.queue_name) > 0 then
res := res + 1;
end if;
end loop;
return res;
end;
$$ language plpgsql security definer;
|