summaryrefslogtreecommitdiff
path: root/sql/londiste/functions/londiste.create_trigger.sql
blob: 863bcadca2f8a9e7f08477653b24d05c251c0cc9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
create or replace function londiste.create_trigger(
    in i_queue_name     text,
    in i_table_name     text,
    in i_trg_args       text[],
    in i_dest_table     text,
    in i_node_type      text,
    out ret_code        int4,
    out ret_note        text,
    out trigger_name    text)
as $$
------------------------------------------------------------------------
-- Function: londiste.create_trigger(5)
--
--     Create or replace londiste trigger(s)
--
-- Parameters:
--      i_queue_name - queue name
--      i_table_name - table name
--      i_trg_args   - args to trigger
--      i_dest_table - actual name of destination table (NULL if same as src)
--      i_node_type  - l3 node type
--
-- Trigger args:
--      See documentation for pgq triggers.
--
-- Trigger creation flags (default: AIUDL):
--      I - ON INSERT
--      U - ON UPDATE
--      D - ON DELETE
--      Q - use pgq.sqltriga() as trigger function
--      L - use pgq.logutriga() as trigger function
--      B - BEFORE
--      A - AFTER
--      S - SKIP
--
-- Returns:
--      200 - Ok
--      201 - Trigger not created
--      405 - Multiple SKIP triggers
--
------------------------------------------------------------------------
declare
    trigger_name text;
    lg_func text;
    lg_pos text;
    lg_event text;
    lg_args text[];
    _old_tgargs bytea;
    _new_tgargs bytea;
    trunctrg_name text;
    pgversion int;
    sql text;
    arg text;
    i integer;
    _extra_args text[] := '{}';
    -- skip trigger
    _skip_prefix text := 'zzz_';
    _skip_trg_count integer;
    _skip_trg_name text;
    -- given tgflags array
    _tgflags char[];
    -- ordinary argument array
    _args text[];
    -- array with all valid tgflags values
    _valid_flags char[] := array['B','A','Q','L','I','U','D','S'];
    -- argument flags
    _skip boolean := false;
    _no_triggers boolean := false;
    _got_extra1 boolean := false;
begin
    -- parse trigger args
    if array_lower(i_trg_args, 1) is not null then
        for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
            arg := i_trg_args[i];
            if arg like 'tgflags=%' then
                -- special flag handling
                arg := upper(substr(arg, 9));
                for j in array_lower(_valid_flags, 1) .. array_upper(_valid_flags, 1) loop
                    if position(_valid_flags[j] in arg) > 0 then
                        _tgflags := array_append(_tgflags, _valid_flags[j]);
                    end if;
                end loop;
            elsif arg = 'no_triggers' then
                _no_triggers := true;
            elsif lower(arg) = 'skip' then
                _skip := true;
            elsif arg = 'virtual_table' then
                _no_triggers := true;   -- do not create triggers
            elsif arg not in ('expect_sync', 'skip_truncate', 'merge_all', 'no_merge') then -- ignore add-table args
                if arg like 'ev_extra1=%' then
                    _got_extra1 := true;
                end if;
                -- ordinary arg
                _args = array_append(_args, quote_literal(arg));
            end if;
        end loop;
    end if;

    if i_dest_table <> i_table_name and not _got_extra1 then
        -- if renamed table, enforce trigger to put
        -- global table name into extra1
        arg := 'ev_extra1=' || quote_literal(i_table_name);
        _args := array_append(_args, quote_literal(arg));
    end if;

    trigger_name := '_londiste_' || i_queue_name;
    lg_func := 'pgq.logutriga';
    lg_event := '';
    lg_args := array[quote_literal(i_queue_name)];
    lg_pos := 'after';

    if array_lower(_args, 1) is not null then
        lg_args := lg_args || _args;
    end if;

    if 'B' = any(_tgflags) then
        lg_pos := 'before';
    end if;
    if 'A' = any(_tgflags)  then
        lg_pos := 'after';
    end if;
    if 'Q' = any(_tgflags) then
        lg_func := 'pgq.sqltriga';
    end if;
    if 'L' = any(_tgflags) then
        lg_func := 'pgq.logutriga';
    end if;
    if 'I' = any(_tgflags) then
        lg_event := lg_event || ' or insert';
    end if;
    if 'U' = any(_tgflags) then
        lg_event := lg_event || ' or update';
    end if;
    if 'D' = any(_tgflags) then
        lg_event := lg_event || ' or delete';
    end if;
    if 'S' = any(_tgflags) then
        _skip := true;
    end if;

    if i_node_type = 'leaf' then
        -- on weird leafs the trigger funcs may not exist
        perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
            where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga');
        if not found then
            select 201, 'Trigger not created' into ret_code, ret_note;
            return;
        end if;
        -- on regular leaf, install deny trigger
        _extra_args := array_append(_extra_args, quote_literal('deny'));
    end if;

    if _skip or lg_pos = 'after' then
        -- get count and name of existing skip triggers
        select count(*), min(t.tgname)
            into _skip_trg_count, _skip_trg_name
            from pg_catalog.pg_trigger t
            where t.tgrelid = londiste.find_table_oid(i_dest_table)
                and position(E'\\000SKIP\\000'::bytea in tgargs) > 0;
    end if;

    -- make sure new trigger won't be effectively inactive
    if lg_pos = 'after' and _skip_trg_count > 0 then
        select 403, 'AFTER trigger cannot work with SKIP trigger(s)'
        into ret_code, ret_note;
        return;
    end if;

    -- if skip param given, rename previous skip triggers and prefix current
    if _skip then
        -- if no previous skip triggers, prefix name and add SKIP to args
        if _skip_trg_count = 0 then
            trigger_name := _skip_prefix || trigger_name;
            lg_args := array_append(lg_args, quote_literal('SKIP'));
        -- if one previous skip trigger, check it's prefix and
        -- do not use SKIP on current trigger
        elsif _skip_trg_count = 1 then
            -- if not prefixed then rename
            if position(_skip_prefix in _skip_trg_name) != 1 then
                sql := 'alter trigger ' || _skip_trg_name
                    || ' on ' || londiste.quote_fqname(i_dest_table)
                    || ' rename to ' || _skip_prefix || _skip_trg_name;
                execute sql;
            end if;
        else
            select 405, 'Multiple SKIP triggers'
            into ret_code, ret_note;
            return;
        end if;
    end if;

    -- create Ins/Upd/Del trigger if it does not exists already
    select t.tgargs
        from pg_catalog.pg_trigger t
        where t.tgrelid = londiste.find_table_oid(i_dest_table)
            and t.tgname = trigger_name
        into _old_tgargs;

    if found then
        _new_tgargs := decode(lg_args[1], 'escape');
        for i in 2 .. array_upper(lg_args, 1) loop
            _new_tgargs := _new_tgargs || E'\\000'::bytea || decode(lg_args[i], 'escape');
        end loop;

        if _old_tgargs is distinct from _new_tgargs then
            sql := 'drop trigger if exists ' || quote_ident(trigger_name)
                || ' on ' || londiste.quote_fqname(i_dest_table);
            execute sql;
        end if;
    end if;

    if not found or _old_tgargs is distinct from _new_tgargs then
        if _no_triggers then
            select 201, 'Trigger not created'
            into ret_code, ret_note;
            return;
        end if;

        -- finalize event
        lg_event := substr(lg_event, 4); -- remove ' or '
        if lg_event = '' then
            lg_event := 'insert or update or delete';
        end if;

        -- create trigger
        lg_args := lg_args || _extra_args;
        sql := 'create trigger ' || quote_ident(trigger_name)
            || ' ' || lg_pos || ' ' || lg_event
            || ' on ' || londiste.quote_fqname(i_dest_table)
            || ' for each row execute procedure '
            || lg_func || '(' || array_to_string(lg_args, ', ') || ')';
        execute sql;
    end if;

    -- create truncate trigger if it does not exists already
    show server_version_num into pgversion;
    if pgversion >= 80400 then
        trunctrg_name  := '_londiste_' || i_queue_name || '_truncate';
        perform 1 from pg_catalog.pg_trigger
          where tgrelid = londiste.find_table_oid(i_dest_table)
            and tgname = trunctrg_name;
        if not found then
            _extra_args := quote_literal(i_queue_name) || _extra_args;
            sql := 'create trigger ' || quote_ident(trunctrg_name)
                || ' after truncate on ' || londiste.quote_fqname(i_dest_table)
                || ' for each statement execute procedure pgq.sqltriga('
                || array_to_string(_extra_args, ', ') || ')';
            execute sql;
        end if;
    end if;

    select 200, 'OK'
    into ret_code, ret_note;
    return;
end;
$$ language plpgsql;