Skip to content

Commit a6097e1

Browse files
committed
Add queued workers implementation
1 parent 0aff489 commit a6097e1

File tree

7 files changed

+515
-18
lines changed

7 files changed

+515
-18
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
/results
66
regression.diffs
77
regression.out
8+
*.plist

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# contrib/jsonbc/Makefile
22

33
MODULE_big = jsonbc
4-
OBJS= jsonbc.o $(WIN32RES)
4+
OBJS= jsonbc.o jsonbc_worker.o $(WIN32RES)
55

66
EXTENSION = jsonbc
77
DATA = jsonbc--0.1.sql

jsonbc--0.1.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ CREATE TABLE jsonbc_dictionary(
33
id INT4 NOT NULL, /* key id, related to compression options */
44
key TEXT NOT NULL /* jsonb key */
55
);
6-
CREATE UNIQUE INDEX jsonbc_dict_cmopt_idx ON jsonbc_dictionary(cmopt, id);
7-
CREATE INDEX jsonbc_dict_ids_idx ON jsonbc_dictionary (cmopt, id);
6+
CREATE UNIQUE INDEX jsonbc_dict_ids ON jsonbc_dictionary(cmopt, id);
7+
CREATE UNIQUE INDEX jsonbc_dict_keys ON jsonbc_dictionary (cmopt, key);
88

99
CREATE OR REPLACE FUNCTION jsonbc_compression_handler(INTERNAL)
1010
RETURNS COMPRESSION_HANDLER AS 'MODULE_PATHNAME', 'jsonbc_compression_handler'

jsonbc.c

Lines changed: 255 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,25 @@
77
#include "access/htup_details.h"
88
#include "access/sysattr.h"
99
#include "access/xact.h"
10+
#include "catalog/indexing.h"
1011
#include "catalog/pg_attribute.h"
1112
#include "catalog/pg_extension.h"
1213
#include "catalog/pg_type.h"
13-
#include "catalog/indexing.h"
1414
#include "commands/extension.h"
1515
#include "executor/spi.h"
16+
#include "miscadmin.h"
17+
#include "storage/ipc.h"
18+
#include "storage/shm_toc.h"
1619
#include "utils/builtins.h"
1720
#include "utils/fmgroids.h"
21+
#include "utils/guc.h"
1822
#include "utils/jsonb.h"
1923
#include "utils/lsyscache.h"
2024
#include "utils/memutils.h"
2125
#include "utils/pg_crc.h"
2226

2327
PG_MODULE_MAGIC;
2428
PG_FUNCTION_INFO_V1(jsonbc_compression_handler);
25-
PG_FUNCTION_INFO_V1(int4_to_char);
2629

2730
/* we use one buffer for whole transaction to avoid extra allocations */
2831
typedef struct
@@ -35,21 +38,177 @@ typedef struct
3538
MemoryContext item_mcxt;
3639
} CompressionThroughBuffers;
3740

41+
/* local */
3842
static MemoryContext compression_mcxt = NULL;
3943
static CompressionThroughBuffers *compression_buffers = NULL;
44+
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
45+
static shm_toc *toc = NULL;
46+
47+
/* global */
48+
void *workers_data = NULL;
49+
int jsonbc_nworkers = -1;
50+
int jsonbc_cache_size = 0;
51+
int jsonbc_queue_size = 0;
4052

4153
static void init_memory_context(bool);
4254
static void memory_reset_callback(void *arg);
43-
static void get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys);
44-
static char *get_key_by_id(Oid cmoptoid, int32 key_id);
4555
static void encode_varbyte(uint32 val, unsigned char *ptr, int *len);
4656
static uint32 decode_varbyte(unsigned char *ptr);
4757
static char *packJsonbValue(JsonbValue *val, int header_size, int *len);
4858
static Oid get_extension_schema(void);
59+
static void setup_guc_variables(void);
60+
61+
static inline Size
62+
jsonbc_get_queue_size(void)
63+
{
64+
return (Size) (jsonbc_queue_size * 1024);
65+
}
66+
67+
static size_t
68+
jsonbc_shmem_size(void)
69+
{
70+
int i;
71+
shm_toc_estimator e;
72+
Size size;
73+
74+
Assert(jsonbc_nworkers != -1);
75+
shm_toc_initialize_estimator(&e);
76+
77+
shm_toc_estimate_chunk(&e, sizeof(jsonbc_shm_hdr));
78+
for (i = 0; i < jsonbc_nworkers; i++)
79+
{
80+
shm_toc_estimate_chunk(&e, sizeof(jsonbc_shm_worker));
81+
shm_toc_estimate_chunk(&e, jsonbc_get_queue_size());
82+
shm_toc_estimate_chunk(&e, jsonbc_get_queue_size());
83+
}
84+
shm_toc_estimate_keys(&e, jsonbc_nworkers * 3 + 1);
85+
size = shm_toc_estimate(&e);
86+
return size;
87+
}
4988

50-
/* TODO: change to worker, add caches and other stuff */
5189
static void
52-
get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
90+
jsonbc_shmem_startup_hook(void)
91+
{
92+
int mqkey;
93+
bool found;
94+
Size size = jsonbc_shmem_size();
95+
jsonbc_shm_hdr *hdr;
96+
97+
/* Invoke original hook if needed */
98+
if (prev_shmem_startup_hook)
99+
prev_shmem_startup_hook();
100+
101+
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
102+
workers_data = ShmemInitStruct("jsonbc workers shmem", size, &found);
103+
104+
if (!found)
105+
{
106+
int i;
107+
108+
toc = shm_toc_create(JSONBC_SHM_MQ_MAGIC, workers_data, size);
109+
hdr = shm_toc_allocate(toc, sizeof(jsonbc_shm_hdr));
110+
hdr->workers_ready = 0;
111+
shm_toc_insert(toc, 0, hdr);
112+
mqkey = jsonbc_nworkers + 1;
113+
114+
for (i = 0; i < jsonbc_nworkers; i++)
115+
{
116+
jsonbc_shm_worker *wd = shm_toc_allocate(toc, sizeof(jsonbc_shm_worker));
117+
118+
/* each worker will have two mq, for input and output */
119+
wd->mqin = shm_mq_create(shm_toc_allocate(toc, jsonbc_get_queue_size()),
120+
jsonbc_get_queue_size());
121+
wd->mqout = shm_mq_create(shm_toc_allocate(toc, jsonbc_get_queue_size()),
122+
jsonbc_get_queue_size());
123+
124+
/* init worker context */
125+
pg_atomic_init_flag(&wd->busy);
126+
wd->proc = NULL;
127+
128+
shm_toc_insert(toc, i + 1, wd);
129+
shm_toc_insert(toc, mqkey++, wd->mqin);
130+
shm_toc_insert(toc, mqkey++, wd->mqout);
131+
}
132+
}
133+
else toc = shm_toc_attach(JSONBC_SHM_MQ_MAGIC, workers_data);
134+
135+
LWLockRelease(AddinShmemInitLock);
136+
}
137+
138+
void
139+
_PG_init(void)
140+
{
141+
if (!process_shared_preload_libraries_in_progress)
142+
{
143+
ereport(ERROR,
144+
(errmsg("jsonbc module must be initialized in postmaster."),
145+
errhint("add 'jsonbc' to shared_preload_libraries parameter in postgresql.conf")));
146+
}
147+
148+
setup_guc_variables();
149+
150+
prev_shmem_startup_hook = shmem_startup_hook;
151+
shmem_startup_hook = jsonbc_shmem_startup_hook;
152+
153+
if (jsonbc_nworkers)
154+
{
155+
int i;
156+
RequestAddinShmemSpace(jsonbc_shmem_size());
157+
for (i = 0; i < jsonbc_nworkers; i++)
158+
jsonbc_register_worker(i);
159+
}
160+
else elog(LOG, "jsonbc: workers are disabled");
161+
}
162+
163+
static void
164+
setup_guc_variables(void)
165+
{
166+
DefineCustomIntVariable("jsonbc.workers_count",
167+
"Count of workers for jsonbc compresssion",
168+
NULL,
169+
&jsonbc_nworkers,
170+
1, /* default */
171+
0, /* if zero then no workers */
172+
MAX_JSONBC_WORKERS,
173+
PGC_SUSET,
174+
0,
175+
NULL,
176+
NULL,
177+
NULL);
178+
179+
DefineCustomIntVariable("jsonbc.cache_size",
180+
"Cache size for each compression options (kilobytes)",
181+
NULL,
182+
&jsonbc_cache_size,
183+
1, /* 1kb by default */
184+
0, /* no cache */
185+
1024, /* 1 mb */
186+
PGC_SUSET,
187+
GUC_UNIT_KB,
188+
NULL,
189+
NULL,
190+
NULL);
191+
192+
DefineCustomIntVariable("jsonbc.queue_size",
193+
"Size of queue used for communication with workers (kilobytes)",
194+
NULL,
195+
&jsonbc_queue_size,
196+
1, /* 1kb by default */
197+
1, /* 1 kb is minimum too */
198+
1024, /* 1 mb */
199+
PGC_SUSET,
200+
GUC_UNIT_KB,
201+
NULL,
202+
NULL,
203+
NULL);
204+
}
205+
206+
/*
207+
* Get key IDs using relation
208+
* TODO: change to direct access
209+
*/
210+
void
211+
jsonbc_get_key_ids_slow(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
53212
{
54213
int i;
55214
char *nspc = get_namespace_name(get_extension_schema());
@@ -99,15 +258,94 @@ get_key_ids(Oid cmoptoid, char *buf, uint32 *idsbuf, int nkeys)
99258
SPI_finish();
100259
}
101260

102-
static char *
103-
get_key_by_id(Oid cmoptoid, int32 key_id)
261+
/* Get key IDs using workers */
262+
void
263+
jsonbc_get_key_ids(Oid cmoptoid, char *buf, int buflen, uint32 *idsbuf, int nkeys)
264+
{
265+
int i;
266+
267+
if (jsonbc_nworkers <= 0)
268+
{
269+
/* if workers are disabled then backend will do all the work itself */
270+
jsonbc_get_key_ids_slow(cmoptoid, buf, idsbuf, nkeys);
271+
return;
272+
}
273+
274+
again:
275+
for (i = 0; i < jsonbc_nworkers; i++)
276+
{
277+
shm_mq_result resmq;
278+
shm_mq_iovec iov[3];
279+
shm_mq_handle *mqh;
280+
281+
char *res;
282+
Size reslen;
283+
284+
jsonbc_shm_worker *wd = shm_toc_lookup(toc, i + 1, false);
285+
286+
if (!pg_atomic_test_set_flag(&wd->busy))
287+
continue;
288+
289+
iov[0].data = (void *) &nkeys;
290+
iov[0].len = sizeof(int);
291+
292+
iov[1].data = (void *) &cmoptoid;
293+
iov[1].len = sizeof(Oid);
294+
295+
iov[2].data = buf;
296+
iov[2].len = buflen;
297+
298+
elog(LOG, "sending data, %d keys, oid %d, bytes: %ld",
299+
nkeys, cmoptoid, iov[2].len);
300+
301+
/* send keys */
302+
shm_mq_set_sender(wd->mqin, MyProc);
303+
mqh = shm_mq_attach(wd->mqin, NULL, NULL);
304+
resmq = shm_mq_sendv(mqh, iov, 3, false);
305+
if (resmq != SHM_MQ_SUCCESS)
306+
elog(ERROR, "jsonbc: worker has detached");
307+
shm_mq_detach(mqh);
308+
309+
/* get IDs */
310+
shm_mq_set_receiver(wd->mqout, MyProc);
311+
mqh = shm_mq_attach(wd->mqout, NULL, NULL);
312+
resmq = shm_mq_receive(mqh, &reslen, (void **) &res, false);
313+
if (resmq != SHM_MQ_SUCCESS)
314+
elog(ERROR, "jsonbc: worker has detached");
315+
316+
if (reslen == 1)
317+
elog(ERROR, "jsonbc: worker couldn't encode keys");
318+
319+
/* size of the received data should be equal to key array size */
320+
Assert(reslen == sizeof(uint32) * nkeys);
321+
memcpy((void *) idsbuf, res, reslen);
322+
shm_mq_detach(mqh);
323+
324+
shm_mq_clean_sender(wd->mqin);
325+
shm_mq_clean_receiver(wd->mqout);
326+
327+
pg_atomic_clear_flag(&wd->busy);
328+
329+
/* we're done here */
330+
return;
331+
}
332+
333+
CHECK_FOR_INTERRUPTS();
334+
pg_usleep(100);
335+
336+
/* TODO: add attempts count check */
337+
goto again;
338+
}
339+
340+
char *
341+
jsonbc_get_key_by_id(Oid cmoptoid, int32 key_id)
104342
{
105343
MemoryContext old_mcxt;
106344

107345
char *nspc = get_namespace_name(get_extension_schema());
108346
char *res = NULL;
109347
char *sql = psprintf("SELECT key FROM %s.jsonbc_dictionary WHERE cmopt = %d"
110-
" AND id = %d", nspc, cmoptoid, key_id);
348+
" AND id = %d", nspc, cmoptoid, key_id);
111349

112350
if (SPI_connect() != SPI_OK_CONNECT)
113351
elog(ERROR, "SPI_connect failed");
@@ -339,23 +577,25 @@ jsonbc_compress(AttributeCompression *ac, const struct varlena *data)
339577
Assert(offset == len);
340578

341579
/* retrieve or generate ids */
342-
get_key_ids(ac->cmoptoid, buf, idsbuf, nkeys);
580+
jsonbc_get_key_ids(ac->cmoptoid, buf, len, idsbuf, nkeys);
343581

344582
/* replace the old keys with encoded ids */
345583
for (i = 0; i < nkeys; i++)
346584
{
585+
int keylen;
586+
347587
JsonbValue *v = &jbv->val.object.pairs[i].key;
348588

349-
encode_varbyte(idsbuf[i], (unsigned char *) keyptr, &len);
589+
encode_varbyte(idsbuf[i], (unsigned char *) keyptr, &keylen);
350590
v->val.string.val = keyptr;
351-
v->val.string.len = len;
352-
keyptr += len;
591+
v->val.string.len = keylen;
592+
keyptr += keylen;
353593
}
354594
}
355595
}
356596

357597
/* don't compress scalar values */
358-
if (IsAJsonbScalar(jbv))
598+
if (jbv == NULL || IsAJsonbScalar(jbv))
359599
return NULL;
360600

361601
res = (struct varlena *) packJsonbValue(jbv, VARHDRSZ_CUSTOM_COMPRESSED, &size);
@@ -399,7 +639,7 @@ jsonbc_decompress(AttributeCompression *ac, const struct varlena *data)
399639
key_id = decode_varbyte((unsigned char *) v.val.string.val);
400640

401641
v.type = jbvString;
402-
v.val.string.val = get_key_by_id(ac->cmoptoid, key_id);
642+
v.val.string.val = jsonbc_get_key_by_id(ac->cmoptoid, key_id);
403643
v.val.string.len = strlen(v.val.string.val);
404644
}
405645
jbv = pushJsonbValue(&state, r, r < WJB_BEGIN_ARRAY ? &v : NULL);

0 commit comments

Comments
 (0)