1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
create or replace function pgq.logutriga()
returns trigger as $$
# -- ----------------------------------------------------------------------
# -- Function: pgq.logutriga()
# --
# -- Trigger function that puts row data urlencoded into queue.
# --
# -- Trigger parameters:
# -- arg1 - queue name
# -- arg2 - optionally 'SKIP'
# --
# -- Queue event fields:
# -- ev_type - I/U/D
# -- ev_data - column values urlencoded
# -- ev_extra1 - table name
# -- ev_extra2 - primary key columns
# --
# -- Regular listen trigger example:
# -- > CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
# -- > FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname');
# --
# -- Redirect trigger example:
# -- > CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
# -- > FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname', 'SKIP');
# -- ----------------------------------------------------------------------
# this triger takes 1 or 2 args:
# queue_name - destination queue
# option return code (OK, SKIP) SKIP means op won't happen
# copy-paste of db_urlencode from skytools.quoting
from urllib import quote_plus
def db_urlencode(dict):
elem_list = []
for k, v in dict.items():
if v is None:
elem = quote_plus(str(k))
else:
elem = quote_plus(str(k)) + '=' + quote_plus(str(v))
elem_list.append(elem)
return '&'.join(elem_list)
# load args
queue_name = TD['args'][0]
if len(TD['args']) > 1:
ret_code = TD['args'][1]
else:
ret_code = 'OK'
table_oid = TD['relid']
# on first call init plans
if not 'init_done' in SD:
# find table name
q = "SELECT n.nspname || '.' || c.relname AS table_name"\
" FROM pg_namespace n, pg_class c"\
" WHERE n.oid = c.relnamespace AND c.oid = $1"
SD['name_plan'] = plpy.prepare(q, ['oid'])
# find key columns
q = "SELECT k.attname FROM pg_index i, pg_attribute k"\
" WHERE i.indrelid = $1 AND k.attrelid = i.indexrelid"\
" AND i.indisprimary AND k.attnum > 0 AND NOT k.attisdropped"\
" ORDER BY k.attnum"
SD['key_plan'] = plpy.prepare(q, ['oid'])
# insert data
q = "SELECT pgq.insert_event($1, $2, $3, $4, $5, null, null)"
SD['ins_plan'] = plpy.prepare(q, ['text', 'text', 'text', 'text', 'text'])
# shorter tags
SD['op_map'] = {'INSERT': 'I', 'UPDATE': 'U', 'DELETE': 'D'}
# remember init
SD['init_done'] = 1
# load & cache table data
if table_oid in SD:
tbl_name, tbl_keys = SD[table_oid]
else:
res = plpy.execute(SD['name_plan'], [table_oid])
tbl_name = res[0]['table_name']
res = plpy.execute(SD['key_plan'], [table_oid])
tbl_keys = ",".join(map(lambda x: x['attname'], res))
SD[table_oid] = (tbl_name, tbl_keys)
# prepare args
if TD['event'] == 'DELETE':
data = db_urlencode(TD['old'])
else:
data = db_urlencode(TD['new'])
# insert event
plpy.execute(SD['ins_plan'], [
queue_name,
SD['op_map'][TD['event']],
data, tbl_name, tbl_keys])
# done
return ret_code
$$ language plpythonu;
|