1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
-- ----------------------------------------------------------------------
-- File: Tables
--
-- Schema 'pgq_node', contains tables for cascaded pgq.
--
-- Event types for cascaded queue:
-- pgq.location-info - ev_data: node_name, extra1: queue_name, extra2: location, extra3: dead
-- It contains updated node connect string.
--
-- pgq.global-watermark - ev_data: tick_id, extra1: queue_name
-- Root node sends minimal tick_id that must be kept.
--
-- pgq.tick-id - ev_data: tick_id, extra1: queue_name
-- Partition node inserts its tick-id into combined queue.
--
-- ----------------------------------------------------------------------
create schema pgq_node;
-- ----------------------------------------------------------------------
-- Table: pgq_node.node_location
--
-- Static table that just lists all members in set.
--
-- Columns:
-- queue_name - cascaded queue name
-- node_name - node name
-- node_location - libpq connect string for connecting to node
-- dead - whether the node is offline
-- ----------------------------------------------------------------------
create table pgq_node.node_location (
queue_name text not null,
node_name text not null,
node_location text not null,
dead boolean not null default false,
primary key (queue_name, node_name)
);
-- ----------------------------------------------------------------------
-- Table: pgq_node.node_info
--
-- Local node info.
--
-- Columns:
-- queue_name - cascaded queue name
-- node_type - local node type
-- node_name - local node name
-- worker_name - consumer name that maintains this node
-- combined_queue - on 'leaf' the target combined set name
-- node_attrs - urlencoded fields for worker
--
-- Node types:
-- root - data + batches is generated here
-- branch - replicates full queue contents and maybe contains some tables
-- leaf - does not replicate queue / or uses combined queue for that
-- ----------------------------------------------------------------------
create table pgq_node.node_info (
queue_name text not null primary key,
node_type text not null,
node_name text not null,
worker_name text,
combined_queue text,
node_attrs text,
foreign key (queue_name, node_name) references pgq_node.node_location,
check (node_type in ('root', 'branch', 'leaf')),
check (case when node_type = 'root' then (worker_name is not null and combined_queue is null)
when node_type = 'branch' then (worker_name is not null and combined_queue is null)
when node_type = 'leaf' then (worker_name is not null)
else false end)
);
-- ----------------------------------------------------------------------
-- Table: pgq_node.local_state
--
-- All cascaded consumers (both worker and non-worker)
-- keep their state here.
--
-- Columns:
-- queue_name - cascaded queue name
-- consumer_name - cascaded consumer name
-- provider_node - node name the consumer reads from
-- last_tick_id - last committed tick id on this node
-- cur_error - reason why current batch failed
-- paused - whether consumer should wait
-- uptodate - if consumer has seen new state
-- ----------------------------------------------------------------------
create table pgq_node.local_state (
queue_name text not null,
consumer_name text not null,
provider_node text not null,
last_tick_id bigint not null,
cur_error text,
paused boolean not null default false,
uptodate boolean not null default false,
primary key (queue_name, consumer_name),
foreign key (queue_name) references pgq_node.node_info,
foreign key (queue_name, provider_node) references pgq_node.node_location
);
-- ----------------------------------------------------------------------
-- Table: pgq_node.subscriber_info
--
-- List of nodes that subscribe to local node.
--
-- Columns:
-- queue_name - cascaded queue name
-- subscriber_node - node name that uses this node as provider.
-- worker_name - consumer name that maintains remote node
-- ----------------------------------------------------------------------
create table pgq_node.subscriber_info (
queue_name text not null,
subscriber_node text not null,
worker_name text not null,
watermark_name text not null,
primary key (queue_name, subscriber_node),
foreign key (queue_name) references pgq_node.node_info,
foreign key (queue_name, subscriber_node) references pgq_node.node_location,
foreign key (worker_name) references pgq.consumer (co_name),
foreign key (watermark_name) references pgq.consumer (co_name)
);
|