summaryrefslogtreecommitdiff
path: root/sql/pgq/old/pgq.logutriga.sql
blob: d4cb01457cfca74e325771fec53a14b923e4efd5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

create or replace function pgq.logutriga()
returns trigger as $$
# -- ----------------------------------------------------------------------
# -- Function: pgq.logutriga()
# --
# --      Trigger function that puts row data urlencoded into queue.
# --
# -- Trigger parameters:
# --      arg1 - queue name
# --      arg2 - optionally 'SKIP'
# --
# -- Queue event fields:
# --   ev_type      - I/U/D
# --   ev_data      - column values urlencoded
# --   ev_extra1    - table name
# --   ev_extra2    - primary key columns
# --
# -- Regular listen trigger example:
# -- >  CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
# -- >  FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname');
# --
# -- Redirect trigger example:
# -- >   CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
# -- >   FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname', 'SKIP');
# -- ----------------------------------------------------------------------

# this triger takes 1 or 2 args:
#   queue_name - destination queue
#   option return code (OK, SKIP) SKIP means op won't happen
# copy-paste of db_urlencode from skytools.quoting
from urllib import quote_plus
def db_urlencode(dict):
    elem_list = []
    for k, v in dict.items():
        if v is None:
            elem = quote_plus(str(k))
        else:
            elem = quote_plus(str(k)) + '=' + quote_plus(str(v))
        elem_list.append(elem)
    return '&'.join(elem_list)

# load args
queue_name = TD['args'][0]
if len(TD['args']) > 1:
    ret_code = TD['args'][1]
else:
    ret_code = 'OK'
table_oid = TD['relid']

# on first call init plans
if not 'init_done' in SD:
    # find table name
    q = "SELECT n.nspname || '.' || c.relname AS table_name"\
        " FROM pg_namespace n, pg_class c"\
        " WHERE n.oid = c.relnamespace AND c.oid = $1"
    SD['name_plan'] = plpy.prepare(q, ['oid'])

    # find key columns
    q = "SELECT k.attname FROM pg_index i, pg_attribute k"\
        " WHERE i.indrelid = $1 AND k.attrelid = i.indexrelid"\
        "   AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped"\
        " ORDER BY k.attnum"
    SD['key_plan'] = plpy.prepare(q, ['oid'])

    # insert data
    q = "SELECT pgq.insert_event($1, $2, $3, $4, $5, null, null)"
    SD['ins_plan'] = plpy.prepare(q, ['text', 'text', 'text', 'text', 'text'])

    # shorter tags
    SD['op_map'] = {'INSERT': 'I', 'UPDATE': 'U', 'DELETE': 'D'}

    # remember init
    SD['init_done'] = 1

# load & cache table data
if table_oid in SD:
    tbl_name, tbl_keys = SD[table_oid]
else:
    res = plpy.execute(SD['name_plan'], [table_oid])
    tbl_name = res[0]['table_name']
    res = plpy.execute(SD['key_plan'], [table_oid])
    tbl_keys = ",".join(map(lambda x: x['attname'], res))

    SD[table_oid] = (tbl_name, tbl_keys)

# prepare args
if TD['event'] == 'DELETE':
    data = db_urlencode(TD['old'])
else:
    data = db_urlencode(TD['new'])

# insert event
plpy.execute(SD['ins_plan'], [
    queue_name,
    SD['op_map'][TD['event']],
    data, tbl_name, tbl_keys])

# done
return ret_code

$$ language plpythonu;