1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
create or replace function londiste.create_trigger(
in i_queue_name text,
in i_table_name text,
in i_trg_args text[],
in i_dest_table text,
in i_node_type text,
out ret_code int4,
out ret_note text,
out trigger_name text)
as $$
------------------------------------------------------------------------
-- Function: londiste.create_trigger(5)
--
-- Create or replace londiste trigger(s)
--
-- Parameters:
-- i_queue_name - queue name
-- i_table_name - table name
-- i_trg_args - args to trigger
-- i_dest_table - actual name of destination table (NULL if same as src)
-- i_node_type - l3 node type
--
-- Trigger args:
-- See documentation for pgq triggers.
--
-- Trigger creation flags (default: AIUDL):
-- I - ON INSERT
-- U - ON UPDATE
-- D - ON DELETE
-- Q - use pgq.sqltriga() as trigger function
-- L - use pgq.logutriga() as trigger function
-- B - BEFORE
-- A - AFTER
-- S - SKIP
--
-- Returns:
-- 200 - Ok
-- 201 - Trigger not created
-- 405 - Multiple SKIP triggers
--
------------------------------------------------------------------------
declare
trigger_name text;
lg_func text;
lg_pos text;
lg_event text;
lg_args text[];
_old_tgargs bytea;
_new_tgargs bytea;
trunctrg_name text;
pgversion int;
sql text;
arg text;
i integer;
_extra_args text[] := '{}';
-- skip trigger
_skip_prefix text := 'zzz_';
_skip_trg_count integer;
_skip_trg_name text;
-- given tgflags array
_tgflags char[];
-- ordinary argument array
_args text[];
-- array with all valid tgflags values
_valid_flags char[] := array['B','A','Q','L','I','U','D','S'];
-- argument flags
_skip boolean := false;
_no_triggers boolean := false;
_got_extra1 boolean := false;
begin
-- parse trigger args
if array_lower(i_trg_args, 1) is not null then
for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
arg := i_trg_args[i];
if arg like 'tgflags=%' then
-- special flag handling
arg := upper(substr(arg, 9));
for j in array_lower(_valid_flags, 1) .. array_upper(_valid_flags, 1) loop
if position(_valid_flags[j] in arg) > 0 then
_tgflags := array_append(_tgflags, _valid_flags[j]);
end if;
end loop;
elsif arg = 'no_triggers' then
_no_triggers := true;
elsif lower(arg) = 'skip' then
_skip := true;
elsif arg = 'virtual_table' then
_no_triggers := true; -- do not create triggers
elsif arg not in ('expect_sync', 'skip_truncate', 'merge_all', 'no_merge') then -- ignore add-table args
if arg like 'ev_extra1=%' then
_got_extra1 := true;
end if;
-- ordinary arg
_args = array_append(_args, quote_literal(arg));
end if;
end loop;
end if;
if i_dest_table <> i_table_name and not _got_extra1 then
-- if renamed table, enforce trigger to put
-- global table name into extra1
arg := 'ev_extra1=' || quote_literal(i_table_name);
_args := array_append(_args, quote_literal(arg));
end if;
trigger_name := '_londiste_' || i_queue_name;
lg_func := 'pgq.logutriga';
lg_event := '';
lg_args := array[quote_literal(i_queue_name)];
lg_pos := 'after';
if array_lower(_args, 1) is not null then
lg_args := lg_args || _args;
end if;
if 'B' = any(_tgflags) then
lg_pos := 'before';
end if;
if 'A' = any(_tgflags) then
lg_pos := 'after';
end if;
if 'Q' = any(_tgflags) then
lg_func := 'pgq.sqltriga';
end if;
if 'L' = any(_tgflags) then
lg_func := 'pgq.logutriga';
end if;
if 'I' = any(_tgflags) then
lg_event := lg_event || ' or insert';
end if;
if 'U' = any(_tgflags) then
lg_event := lg_event || ' or update';
end if;
if 'D' = any(_tgflags) then
lg_event := lg_event || ' or delete';
end if;
if 'S' = any(_tgflags) then
_skip := true;
end if;
if i_node_type = 'leaf' then
-- on weird leafs the trigger funcs may not exist
perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
if not found then
select 201, 'Trigger not created' into ret_code, ret_note;
return;
end if;
-- on regular leaf, install deny trigger
_extra_args := array_append(_extra_args, quote_literal('deny'));
end if;
if _skip or lg_pos = 'after' then
-- get count and name of existing skip triggers
select count(*), min(t.tgname)
into _skip_trg_count, _skip_trg_name
from pg_catalog.pg_trigger t
where t.tgrelid = londiste.find_table_oid(i_dest_table)
and position(E'\\000SKIP\\000'::bytea in tgargs) > 0;
end if;
-- make sure new trigger won't be effectively inactive
if lg_pos = 'after' and _skip_trg_count > 0 then
select 403, 'AFTER trigger cannot work with SKIP trigger(s)'
into ret_code, ret_note;
return;
end if;
-- if skip param given, rename previous skip triggers and prefix current
if _skip then
-- if no previous skip triggers, prefix name and add SKIP to args
if _skip_trg_count = 0 then
trigger_name := _skip_prefix || trigger_name;
lg_args := array_append(lg_args, quote_literal('SKIP'));
-- if one previous skip trigger, check it's prefix and
-- do not use SKIP on current trigger
elsif _skip_trg_count = 1 then
-- if not prefixed then rename
if position(_skip_prefix in _skip_trg_name) != 1 then
sql := 'alter trigger ' || _skip_trg_name
|| ' on ' || londiste.quote_fqname(i_dest_table)
|| ' rename to ' || _skip_prefix || _skip_trg_name;
execute sql;
end if;
else
select 405, 'Multiple SKIP triggers'
into ret_code, ret_note;
return;
end if;
end if;
-- create Ins/Upd/Del trigger if it does not exists already
select t.tgargs
from pg_catalog.pg_trigger t
where t.tgrelid = londiste.find_table_oid(i_dest_table)
and t.tgname = trigger_name
into _old_tgargs;
if found then
_new_tgargs := decode(lg_args[1], 'escape');
for i in 2 .. array_upper(lg_args, 1) loop
_new_tgargs := _new_tgargs || E'\\000'::bytea || decode(lg_args[i], 'escape');
end loop;
if _old_tgargs is distinct from _new_tgargs then
sql := 'drop trigger if exists ' || quote_ident(trigger_name)
|| ' on ' || londiste.quote_fqname(i_dest_table);
execute sql;
end if;
end if;
if not found or _old_tgargs is distinct from _new_tgargs then
if _no_triggers then
select 201, 'Trigger not created'
into ret_code, ret_note;
return;
end if;
-- finalize event
lg_event := substr(lg_event, 4); -- remove ' or '
if lg_event = '' then
lg_event := 'insert or update or delete';
end if;
-- create trigger
lg_args := lg_args || _extra_args;
sql := 'create trigger ' || quote_ident(trigger_name)
|| ' ' || lg_pos || ' ' || lg_event
|| ' on ' || londiste.quote_fqname(i_dest_table)
|| ' for each row execute procedure '
|| lg_func || '(' || array_to_string(lg_args, ', ') || ')';
execute sql;
end if;
-- create truncate trigger if it does not exists already
show server_version_num into pgversion;
if pgversion >= 80400 then
trunctrg_name := '_londiste_' || i_queue_name || '_truncate';
perform 1 from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(i_dest_table)
and tgname = trunctrg_name;
if not found then
_extra_args := quote_literal(i_queue_name) || _extra_args;
sql := 'create trigger ' || quote_ident(trunctrg_name)
|| ' after truncate on ' || londiste.quote_fqname(i_dest_table)
|| ' for each statement execute procedure pgq.sqltriga('
|| array_to_string(_extra_args, ', ') || ')';
execute sql;
end if;
end if;
select 200, 'OK'
into ret_code, ret_note;
return;
end;
$$ language plpgsql;
|