blob: 63ab127046f239dc7e85839dcc35fce3ecc5e27e (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
= cube_dispatcher(1) =
== NAME ==
cube_dispatcher - PgQ consumer that is used to write source records into partitoned tables
== SYNOPSIS ==
cube_dispatcher.py [switches] config.ini
== DESCRIPTION ==
cube_dispatcher is PgQ consumer that reads url encoded records from source queue
and writes them into partitioned tables according to configuration file.
Used to prepare data for business intelligence. Name of the table is read from
producer field in event. Batch creation time is used for partitioning. All records
created in same day will go into same table partion. If partiton does not exist
cube dispatcer will create it according to template.
Events are usually procuded by `pgq.logutriga()`. Logutriga adds all the data
of the record into the event (also in case of updates and deletes).
`cube_dispatcher` can be used in to modes:
keep_all::
keeps all the data that comes in. If record is updated several times
during one day then table partiton for that day will contain several instances of
that record.
keep_latest::
only last instance of each record is kept for each day. That also
means that all tables must have primary keys so cube dispatcher can delete previous
versions of records before inserting new data.
== QUICK-START ==
Basic cube_dispatcher setup and usage can be summarized by the following
steps:
1. pgq and logutriga must be installed in source databases.
See pgqadm man page for details. target database must also
have pgq_ext schema.
2. edit a cube_dispatcher configuration file, say cube_dispatcher_sample.ini
3. create source queue
$ pgqadm.py ticker.ini create <queue>
4. create target database and parent tables in it.
5. launch cube dispatcher in daemon mode
$ cube_dispatcher.py cube_dispatcher_sample.ini -d
6. start producing events (create logutriga trggers on tables)
CREATE OR REPLACE TRIGGER trig_cube_replica AFTER INSERT OR UPDATE ON some_table
FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('<queue>')
== CONFIG ==
include::common.config.txt[]
=== Config options specific to `cube_dispatcher` ===
src_db::
Connect string for source database where the queue resides.
dst_db::
Connect string for target database where the tables should be created.
mode::
Operation mode for cube_dispatcher. Either `keep_all` or `keep_latest`.
dateformat::
Optional parameter to specify how to suffix data tables.
Default is `YYYY_MM_DD` which creates per-day tables.
With `YYYY_MM` per-month tables can be created.
If explicitly set empty, partitioning is disabled.
part_template::
SQL fragment for table creation. Various magic replacements are done there:
_PKEY:: comma separated list of primery key columns.
_PARENT:: schema-qualified parent table name.
_DEST_TABLE:: schema-qualified partition table.
_SCHEMA_TABLE:: same as _DEST_TABLE but dots replaced with "__", to allow use as index names.
=== Example config file ===
[cube_dispatcher]
job_name = some_queue_to_cube
src_db = dbname=sourcedb_test
dst_db = dbname=dataminedb_test
pgq_queue_name = udata.some_queue
logfile = ~/log/%(job_name)s.log
pidfile = ~/pid/%(job_name)s.pid
# how many rows are kept: keep_latest, keep_all
mode = keep_latest
# to_char() fmt for table suffix
#dateformat = YYYY_MM_DD
# following disables table suffixes:
#dateformat =
part_template =
create table _DEST_TABLE (like _PARENT);
alter table only _DEST_TABLE add primary key (_PKEY);
== LOGUTRIGA EVENT FORMAT ==
include::common.logutriga.txt[]
== COMMAND LINE SWITCHES ==
include::common.switches.txt[]
|