Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * localbuf.c
4 : * local buffer manager. Fast buffer manager for temporary tables,
5 : * which never need to be WAL-logged or checkpointed, etc.
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994-5, Regents of the University of California
9 : *
10 : *
11 : * IDENTIFICATION
12 : * src/backend/storage/buffer/localbuf.c
13 : *
14 : *-------------------------------------------------------------------------
15 : */
16 : #include "postgres.h"
17 :
18 : #include "access/parallel.h"
19 : #include "executor/instrument.h"
20 : #include "pgstat.h"
21 : #include "storage/aio.h"
22 : #include "storage/buf_internals.h"
23 : #include "storage/bufmgr.h"
24 : #include "storage/fd.h"
25 : #include "utils/guc_hooks.h"
26 : #include "utils/memdebug.h"
27 : #include "utils/memutils.h"
28 : #include "utils/resowner.h"
29 :
30 :
31 : /*#define LBDEBUG*/
32 :
33 : /* entry for buffer lookup hashtable */
34 : typedef struct
35 : {
36 : BufferTag key; /* Tag of a disk page */
37 : int id; /* Associated local buffer's index */
38 : } LocalBufferLookupEnt;
39 :
40 : /* Note: this macro only works on local buffers, not shared ones! */
41 : #define LocalBufHdrGetBlock(bufHdr) \
42 : LocalBufferBlockPointers[-((bufHdr)->buf_id + 2)]
43 :
44 : int NLocBuffer = 0; /* until buffers are initialized */
45 :
46 : BufferDesc *LocalBufferDescriptors = NULL;
47 : Block *LocalBufferBlockPointers = NULL;
48 : int32 *LocalRefCount = NULL;
49 :
50 : static int nextFreeLocalBufId = 0;
51 :
52 : static HTAB *LocalBufHash = NULL;
53 :
54 : /* number of local buffers pinned at least once */
55 : static int NLocalPinnedBuffers = 0;
56 :
57 :
58 : static void InitLocalBuffers(void);
59 : static Block GetLocalBufferStorage(void);
60 : static Buffer GetLocalVictimBuffer(void);
61 :
62 :
63 : /*
64 : * PrefetchLocalBuffer -
65 : * initiate asynchronous read of a block of a relation
66 : *
67 : * Do PrefetchBuffer's work for temporary relations.
68 : * No-op if prefetching isn't compiled in.
69 : */
70 : PrefetchBufferResult
71 1566 : PrefetchLocalBuffer(SMgrRelation smgr, ForkNumber forkNum,
72 : BlockNumber blockNum)
73 : {
74 1566 : PrefetchBufferResult result = {InvalidBuffer, false};
75 : BufferTag newTag; /* identity of requested block */
76 : LocalBufferLookupEnt *hresult;
77 :
78 1566 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
79 :
80 : /* Initialize local buffers if first request in this session */
81 1566 : if (LocalBufHash == NULL)
82 0 : InitLocalBuffers();
83 :
84 : /* See if the desired buffer already exists */
85 : hresult = (LocalBufferLookupEnt *)
86 1566 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
87 :
88 1566 : if (hresult)
89 : {
90 : /* Yes, so nothing to do */
91 1566 : result.recent_buffer = -hresult->id - 1;
92 : }
93 : else
94 : {
95 : #ifdef USE_PREFETCH
96 : /* Not in buffers, so initiate prefetch */
97 0 : if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
98 0 : smgrprefetch(smgr, forkNum, blockNum, 1))
99 : {
100 0 : result.initiated_io = true;
101 : }
102 : #endif /* USE_PREFETCH */
103 : }
104 :
105 1566 : return result;
106 : }
107 :
108 :
109 : /*
110 : * LocalBufferAlloc -
111 : * Find or create a local buffer for the given page of the given relation.
112 : *
113 : * API is similar to bufmgr.c's BufferAlloc, except that we do not need to do
114 : * any locking since this is all local. We support only default access
115 : * strategy (hence, usage_count is always advanced).
116 : */
117 : BufferDesc *
118 2542360 : LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
119 : bool *foundPtr)
120 : {
121 : BufferTag newTag; /* identity of requested block */
122 : LocalBufferLookupEnt *hresult;
123 : BufferDesc *bufHdr;
124 : Buffer victim_buffer;
125 : int bufid;
126 : bool found;
127 :
128 2542360 : InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
129 :
130 : /* Initialize local buffers if first request in this session */
131 2542360 : if (LocalBufHash == NULL)
132 26 : InitLocalBuffers();
133 :
134 2542360 : ResourceOwnerEnlarge(CurrentResourceOwner);
135 :
136 : /* See if the desired buffer already exists */
137 : hresult = (LocalBufferLookupEnt *)
138 2542360 : hash_search(LocalBufHash, &newTag, HASH_FIND, NULL);
139 :
140 2542360 : if (hresult)
141 : {
142 2525614 : bufid = hresult->id;
143 2525614 : bufHdr = GetLocalBufferDescriptor(bufid);
144 : Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
145 :
146 2525614 : *foundPtr = PinLocalBuffer(bufHdr, true);
147 : }
148 : else
149 : {
150 : uint32 buf_state;
151 :
152 16746 : victim_buffer = GetLocalVictimBuffer();
153 16734 : bufid = -victim_buffer - 1;
154 16734 : bufHdr = GetLocalBufferDescriptor(bufid);
155 :
156 : hresult = (LocalBufferLookupEnt *)
157 16734 : hash_search(LocalBufHash, &newTag, HASH_ENTER, &found);
158 16734 : if (found) /* shouldn't happen */
159 0 : elog(ERROR, "local buffer hash table corrupted");
160 16734 : hresult->id = bufid;
161 :
162 : /*
163 : * it's all ours now.
164 : */
165 16734 : bufHdr->tag = newTag;
166 :
167 16734 : buf_state = pg_atomic_read_u32(&bufHdr->state);
168 16734 : buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
169 16734 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
170 16734 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
171 :
172 16734 : *foundPtr = false;
173 : }
174 :
175 2542348 : return bufHdr;
176 : }
177 :
178 : /*
179 : * Like FlushBuffer(), just for local buffers.
180 : */
181 : void
182 7268 : FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
183 : {
184 : instr_time io_start;
185 7268 : Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
186 :
187 : Assert(LocalRefCount[-BufferDescriptorGetBuffer(bufHdr) - 1] > 0);
188 :
189 : /*
190 : * Try to start an I/O operation. There currently are no reasons for
191 : * StartLocalBufferIO to return false, so we raise an error in that case.
192 : */
193 7268 : if (!StartLocalBufferIO(bufHdr, false, false))
194 0 : elog(ERROR, "failed to start write IO on local buffer");
195 :
196 : /* Find smgr relation for buffer */
197 7268 : if (reln == NULL)
198 6668 : reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
199 : MyProcNumber);
200 :
201 7268 : PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
202 :
203 7268 : io_start = pgstat_prepare_io_time(track_io_timing);
204 :
205 : /* And write... */
206 7268 : smgrwrite(reln,
207 7268 : BufTagGetForkNum(&bufHdr->tag),
208 : bufHdr->tag.blockNum,
209 : localpage,
210 : false);
211 :
212 : /* Temporary table I/O does not use Buffer Access Strategies */
213 7268 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL,
214 : IOOP_WRITE, io_start, 1, BLCKSZ);
215 :
216 : /* Mark not-dirty */
217 7268 : TerminateLocalBufferIO(bufHdr, true, 0, false);
218 :
219 7268 : pgBufferUsage.local_blks_written++;
220 7268 : }
221 :
222 : static Buffer
223 46024 : GetLocalVictimBuffer(void)
224 : {
225 : int victim_bufid;
226 : int trycounter;
227 : BufferDesc *bufHdr;
228 :
229 46024 : ResourceOwnerEnlarge(CurrentResourceOwner);
230 :
231 : /*
232 : * Need to get a new buffer. We use a clock sweep algorithm (essentially
233 : * the same as what freelist.c does now...)
234 : */
235 46024 : trycounter = NLocBuffer;
236 : for (;;)
237 : {
238 206098 : victim_bufid = nextFreeLocalBufId;
239 :
240 206098 : if (++nextFreeLocalBufId >= NLocBuffer)
241 1734 : nextFreeLocalBufId = 0;
242 :
243 206098 : bufHdr = GetLocalBufferDescriptor(victim_bufid);
244 :
245 206098 : if (LocalRefCount[victim_bufid] == 0)
246 : {
247 84766 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
248 :
249 84766 : if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
250 : {
251 38754 : buf_state -= BUF_USAGECOUNT_ONE;
252 38754 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
253 38754 : trycounter = NLocBuffer;
254 : }
255 46012 : else if (BUF_STATE_GET_REFCOUNT(buf_state) > 0)
256 : {
257 : /*
258 : * This can be reached if the backend initiated AIO for this
259 : * buffer and then errored out.
260 : */
261 : }
262 : else
263 : {
264 : /* Found a usable buffer */
265 46012 : PinLocalBuffer(bufHdr, false);
266 46012 : break;
267 : }
268 : }
269 121332 : else if (--trycounter == 0)
270 12 : ereport(ERROR,
271 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
272 : errmsg("no empty local buffer available")));
273 : }
274 :
275 : /*
276 : * lazy memory allocation: allocate space on first use of a buffer.
277 : */
278 46012 : if (LocalBufHdrGetBlock(bufHdr) == NULL)
279 : {
280 : /* Set pointer for use by BufferGetBlock() macro */
281 31204 : LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
282 : }
283 :
284 : /*
285 : * this buffer is not referenced but it might still be dirty. if that's
286 : * the case, write it out before reusing it!
287 : */
288 46012 : if (pg_atomic_read_u32(&bufHdr->state) & BM_DIRTY)
289 6624 : FlushLocalBuffer(bufHdr, NULL);
290 :
291 : /*
292 : * Remove the victim buffer from the hashtable and mark as invalid.
293 : */
294 46012 : if (pg_atomic_read_u32(&bufHdr->state) & BM_TAG_VALID)
295 : {
296 12818 : InvalidateLocalBuffer(bufHdr, false);
297 :
298 12818 : pgstat_count_io_op(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EVICT, 1, 0);
299 : }
300 :
301 46012 : return BufferDescriptorGetBuffer(bufHdr);
302 : }
303 :
304 : /* see GetPinLimit() */
305 : uint32
306 13478 : GetLocalPinLimit(void)
307 : {
308 : /* Every backend has its own temporary buffers, and can pin them all. */
309 13478 : return num_temp_buffers;
310 : }
311 :
312 : /* see GetAdditionalPinLimit() */
313 : uint32
314 47348 : GetAdditionalLocalPinLimit(void)
315 : {
316 : Assert(NLocalPinnedBuffers <= num_temp_buffers);
317 47348 : return num_temp_buffers - NLocalPinnedBuffers;
318 : }
319 :
320 : /* see LimitAdditionalPins() */
321 : void
322 22714 : LimitAdditionalLocalPins(uint32 *additional_pins)
323 : {
324 : uint32 max_pins;
325 :
326 22714 : if (*additional_pins <= 1)
327 22068 : return;
328 :
329 : /*
330 : * In contrast to LimitAdditionalPins() other backends don't play a role
331 : * here. We can allow up to NLocBuffer pins in total, but it might not be
332 : * initialized yet so read num_temp_buffers.
333 : */
334 646 : max_pins = (num_temp_buffers - NLocalPinnedBuffers);
335 :
336 646 : if (*additional_pins >= max_pins)
337 0 : *additional_pins = max_pins;
338 : }
339 :
340 : /*
341 : * Implementation of ExtendBufferedRelBy() and ExtendBufferedRelTo() for
342 : * temporary buffers.
343 : */
344 : BlockNumber
345 22714 : ExtendBufferedRelLocal(BufferManagerRelation bmr,
346 : ForkNumber fork,
347 : uint32 flags,
348 : uint32 extend_by,
349 : BlockNumber extend_upto,
350 : Buffer *buffers,
351 : uint32 *extended_by)
352 : {
353 : BlockNumber first_block;
354 : instr_time io_start;
355 :
356 : /* Initialize local buffers if first request in this session */
357 22714 : if (LocalBufHash == NULL)
358 502 : InitLocalBuffers();
359 :
360 22714 : LimitAdditionalLocalPins(&extend_by);
361 :
362 51992 : for (uint32 i = 0; i < extend_by; i++)
363 : {
364 : BufferDesc *buf_hdr;
365 : Block buf_block;
366 :
367 29278 : buffers[i] = GetLocalVictimBuffer();
368 29278 : buf_hdr = GetLocalBufferDescriptor(-buffers[i] - 1);
369 29278 : buf_block = LocalBufHdrGetBlock(buf_hdr);
370 :
371 : /* new buffers are zero-filled */
372 29278 : MemSet(buf_block, 0, BLCKSZ);
373 : }
374 :
375 22714 : first_block = smgrnblocks(bmr.smgr, fork);
376 :
377 : if (extend_upto != InvalidBlockNumber)
378 : {
379 : /*
380 : * In contrast to shared relations, nothing could change the relation
381 : * size concurrently. Thus we shouldn't end up finding that we don't
382 : * need to do anything.
383 : */
384 : Assert(first_block <= extend_upto);
385 :
386 : Assert((uint64) first_block + extend_by <= extend_upto);
387 : }
388 :
389 : /* Fail if relation is already at maximum possible length */
390 22714 : if ((uint64) first_block + extend_by >= MaxBlockNumber)
391 0 : ereport(ERROR,
392 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
393 : errmsg("cannot extend relation %s beyond %u blocks",
394 : relpath(bmr.smgr->smgr_rlocator, fork).str,
395 : MaxBlockNumber)));
396 :
397 51992 : for (uint32 i = 0; i < extend_by; i++)
398 : {
399 : int victim_buf_id;
400 : BufferDesc *victim_buf_hdr;
401 : BufferTag tag;
402 : LocalBufferLookupEnt *hresult;
403 : bool found;
404 :
405 29278 : victim_buf_id = -buffers[i] - 1;
406 29278 : victim_buf_hdr = GetLocalBufferDescriptor(victim_buf_id);
407 :
408 : /* in case we need to pin an existing buffer below */
409 29278 : ResourceOwnerEnlarge(CurrentResourceOwner);
410 :
411 29278 : InitBufferTag(&tag, &bmr.smgr->smgr_rlocator.locator, fork, first_block + i);
412 :
413 : hresult = (LocalBufferLookupEnt *)
414 29278 : hash_search(LocalBufHash, &tag, HASH_ENTER, &found);
415 29278 : if (found)
416 : {
417 : BufferDesc *existing_hdr;
418 : uint32 buf_state;
419 :
420 0 : UnpinLocalBuffer(BufferDescriptorGetBuffer(victim_buf_hdr));
421 :
422 0 : existing_hdr = GetLocalBufferDescriptor(hresult->id);
423 0 : PinLocalBuffer(existing_hdr, false);
424 0 : buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
425 :
426 : /*
427 : * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
428 : */
429 0 : buf_state = pg_atomic_read_u32(&existing_hdr->state);
430 : Assert(buf_state & BM_TAG_VALID);
431 : Assert(!(buf_state & BM_DIRTY));
432 0 : buf_state &= ~BM_VALID;
433 0 : pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
434 :
435 : /* no need to loop for local buffers */
436 0 : StartLocalBufferIO(existing_hdr, true, false);
437 : }
438 : else
439 : {
440 29278 : uint32 buf_state = pg_atomic_read_u32(&victim_buf_hdr->state);
441 :
442 : Assert(!(buf_state & (BM_VALID | BM_TAG_VALID | BM_DIRTY | BM_JUST_DIRTIED)));
443 :
444 29278 : victim_buf_hdr->tag = tag;
445 :
446 29278 : buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
447 :
448 29278 : pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
449 :
450 29278 : hresult->id = victim_buf_id;
451 :
452 29278 : StartLocalBufferIO(victim_buf_hdr, true, false);
453 : }
454 : }
455 :
456 22714 : io_start = pgstat_prepare_io_time(track_io_timing);
457 :
458 : /* actually extend relation */
459 22714 : smgrzeroextend(bmr.smgr, fork, first_block, extend_by, false);
460 :
461 22714 : pgstat_count_io_op_time(IOOBJECT_TEMP_RELATION, IOCONTEXT_NORMAL, IOOP_EXTEND,
462 22714 : io_start, 1, extend_by * BLCKSZ);
463 :
464 51992 : for (uint32 i = 0; i < extend_by; i++)
465 : {
466 29278 : Buffer buf = buffers[i];
467 : BufferDesc *buf_hdr;
468 : uint32 buf_state;
469 :
470 29278 : buf_hdr = GetLocalBufferDescriptor(-buf - 1);
471 :
472 29278 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
473 29278 : buf_state |= BM_VALID;
474 29278 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
475 : }
476 :
477 22714 : *extended_by = extend_by;
478 :
479 22714 : pgBufferUsage.local_blks_written += extend_by;
480 :
481 22714 : return first_block;
482 : }
483 :
484 : /*
485 : * MarkLocalBufferDirty -
486 : * mark a local buffer dirty
487 : */
488 : void
489 3697614 : MarkLocalBufferDirty(Buffer buffer)
490 : {
491 : int bufid;
492 : BufferDesc *bufHdr;
493 : uint32 buf_state;
494 :
495 : Assert(BufferIsLocal(buffer));
496 :
497 : #ifdef LBDEBUG
498 : fprintf(stderr, "LB DIRTY %d\n", buffer);
499 : #endif
500 :
501 3697614 : bufid = -buffer - 1;
502 :
503 : Assert(LocalRefCount[bufid] > 0);
504 :
505 3697614 : bufHdr = GetLocalBufferDescriptor(bufid);
506 :
507 3697614 : buf_state = pg_atomic_read_u32(&bufHdr->state);
508 :
509 3697614 : if (!(buf_state & BM_DIRTY))
510 29626 : pgBufferUsage.local_blks_dirtied++;
511 :
512 3697614 : buf_state |= BM_DIRTY;
513 :
514 3697614 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
515 3697614 : }
516 :
517 : /*
518 : * Like StartBufferIO, but for local buffers
519 : */
520 : bool
521 53404 : StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
522 : {
523 : uint32 buf_state;
524 :
525 : /*
526 : * With AIO the buffer could have IO in progress, e.g. when there are two
527 : * scans of the same relation. Either wait for the other IO or return
528 : * false.
529 : */
530 53404 : if (pgaio_wref_valid(&bufHdr->io_wref))
531 : {
532 0 : PgAioWaitRef iow = bufHdr->io_wref;
533 :
534 0 : if (nowait)
535 0 : return false;
536 :
537 0 : pgaio_wref_wait(&iow);
538 : }
539 :
540 : /* Once we get here, there is definitely no I/O active on this buffer */
541 :
542 : /* Check if someone else already did the I/O */
543 53404 : buf_state = pg_atomic_read_u32(&bufHdr->state);
544 53404 : if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
545 : {
546 4 : return false;
547 : }
548 :
549 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
550 :
551 : /* local buffers don't track IO using resowners */
552 :
553 53400 : return true;
554 : }
555 :
556 : /*
557 : * Like TerminateBufferIO, but for local buffers
558 : */
559 : void
560 24118 : TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint32 set_flag_bits,
561 : bool release_aio)
562 : {
563 : /* Only need to adjust flags */
564 24118 : uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
565 :
566 : /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
567 :
568 : /* Clear earlier errors, if this IO failed, it'll be marked again */
569 24118 : buf_state &= ~BM_IO_ERROR;
570 :
571 24118 : if (clear_dirty)
572 7268 : buf_state &= ~BM_DIRTY;
573 :
574 24118 : if (release_aio)
575 : {
576 : /* release pin held by IO subsystem, see also buffer_stage_common() */
577 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
578 16794 : buf_state -= BUF_REFCOUNT_ONE;
579 16794 : pgaio_wref_clear(&bufHdr->io_wref);
580 : }
581 :
582 24118 : buf_state |= set_flag_bits;
583 24118 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
584 :
585 : /* local buffers don't track IO using resowners */
586 :
587 : /* local buffers don't use the IO CV, as no other process can see buffer */
588 :
589 : /* local buffers don't use BM_PIN_COUNT_WAITER, so no need to wake */
590 24118 : }
591 :
592 : /*
593 : * InvalidateLocalBuffer -- mark a local buffer invalid.
594 : *
595 : * If check_unreferenced is true, error out if the buffer is still
596 : * pinned. Passing false is appropriate when calling InvalidateLocalBuffer()
597 : * as part of changing the identity of a buffer, instead of just dropping the
598 : * buffer.
599 : *
600 : * See also InvalidateBuffer().
601 : */
602 : void
603 46012 : InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced)
604 : {
605 46012 : Buffer buffer = BufferDescriptorGetBuffer(bufHdr);
606 46012 : int bufid = -buffer - 1;
607 : uint32 buf_state;
608 : LocalBufferLookupEnt *hresult;
609 :
610 : /*
611 : * It's possible that we started IO on this buffer before e.g. aborting
612 : * the transaction that created a table. We need to wait for that IO to
613 : * complete before removing / reusing the buffer.
614 : */
615 46012 : if (pgaio_wref_valid(&bufHdr->io_wref))
616 : {
617 0 : PgAioWaitRef iow = bufHdr->io_wref;
618 :
619 0 : pgaio_wref_wait(&iow);
620 : Assert(!pgaio_wref_valid(&bufHdr->io_wref));
621 : }
622 :
623 46012 : buf_state = pg_atomic_read_u32(&bufHdr->state);
624 :
625 : /*
626 : * We need to test not just LocalRefCount[bufid] but also the BufferDesc
627 : * itself, as the latter is used to represent a pin by the AIO subsystem.
628 : * This can happen if AIO is initiated and then the query errors out.
629 : */
630 46012 : if (check_unreferenced &&
631 33194 : (LocalRefCount[bufid] != 0 || BUF_STATE_GET_REFCOUNT(buf_state) != 0))
632 0 : elog(ERROR, "block %u of %s is still referenced (local %d)",
633 : bufHdr->tag.blockNum,
634 : relpathbackend(BufTagGetRelFileLocator(&bufHdr->tag),
635 : MyProcNumber,
636 : BufTagGetForkNum(&bufHdr->tag)).str,
637 : LocalRefCount[bufid]);
638 :
639 : /* Remove entry from hashtable */
640 : hresult = (LocalBufferLookupEnt *)
641 46012 : hash_search(LocalBufHash, &bufHdr->tag, HASH_REMOVE, NULL);
642 46012 : if (!hresult) /* shouldn't happen */
643 0 : elog(ERROR, "local buffer hash table corrupted");
644 : /* Mark buffer invalid */
645 46012 : ClearBufferTag(&bufHdr->tag);
646 46012 : buf_state &= ~BUF_FLAG_MASK;
647 46012 : buf_state &= ~BUF_USAGECOUNT_MASK;
648 46012 : pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
649 46012 : }
650 :
651 : /*
652 : * DropRelationLocalBuffers
653 : * This function removes from the buffer pool all the pages of the
654 : * specified relation that have block numbers >= firstDelBlock.
655 : * (In particular, with firstDelBlock = 0, all pages are removed.)
656 : * Dirty pages are simply dropped, without bothering to write them
657 : * out first. Therefore, this is NOT rollback-able, and so should be
658 : * used only with extreme caution!
659 : *
660 : * See DropRelationBuffers in bufmgr.c for more notes.
661 : */
662 : void
663 748 : DropRelationLocalBuffers(RelFileLocator rlocator, ForkNumber *forkNum,
664 : int nforks, BlockNumber *firstDelBlock)
665 : {
666 : int i;
667 : int j;
668 :
669 617196 : for (i = 0; i < NLocBuffer; i++)
670 : {
671 616448 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
672 : uint32 buf_state;
673 :
674 616448 : buf_state = pg_atomic_read_u32(&bufHdr->state);
675 :
676 616448 : if (!(buf_state & BM_TAG_VALID) ||
677 56606 : !BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
678 614776 : continue;
679 :
680 1912 : for (j = 0; j < nforks; j++)
681 : {
682 1842 : if (BufTagGetForkNum(&bufHdr->tag) == forkNum[j] &&
683 1666 : bufHdr->tag.blockNum >= firstDelBlock[j])
684 : {
685 1602 : InvalidateLocalBuffer(bufHdr, true);
686 1602 : break;
687 : }
688 : }
689 : }
690 748 : }
691 :
692 : /*
693 : * DropRelationAllLocalBuffers
694 : * This function removes from the buffer pool all pages of all forks
695 : * of the specified relation.
696 : *
697 : * See DropRelationsAllBuffers in bufmgr.c for more notes.
698 : */
699 : void
700 6224 : DropRelationAllLocalBuffers(RelFileLocator rlocator)
701 : {
702 : int i;
703 :
704 5923008 : for (i = 0; i < NLocBuffer; i++)
705 : {
706 5916784 : BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
707 : uint32 buf_state;
708 :
709 5916784 : buf_state = pg_atomic_read_u32(&bufHdr->state);
710 :
711 6350552 : if ((buf_state & BM_TAG_VALID) &&
712 433768 : BufTagMatchesRelFileLocator(&bufHdr->tag, &rlocator))
713 : {
714 31492 : InvalidateLocalBuffer(bufHdr, true);
715 : }
716 : }
717 6224 : }
718 :
719 : /*
720 : * InitLocalBuffers -
721 : * init the local buffer cache. Since most queries (esp. multi-user ones)
722 : * don't involve local buffers, we delay allocating actual memory for the
723 : * buffers until we need them; just make the buffer headers here.
724 : */
725 : static void
726 528 : InitLocalBuffers(void)
727 : {
728 528 : int nbufs = num_temp_buffers;
729 : HASHCTL info;
730 : int i;
731 :
732 : /*
733 : * Parallel workers can't access data in temporary tables, because they
734 : * have no visibility into the local buffers of their leader. This is a
735 : * convenient, low-cost place to provide a backstop check for that. Note
736 : * that we don't wish to prevent a parallel worker from accessing catalog
737 : * metadata about a temp table, so checks at higher levels would be
738 : * inappropriate.
739 : */
740 528 : if (IsParallelWorker())
741 0 : ereport(ERROR,
742 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
743 : errmsg("cannot access temporary tables during a parallel operation")));
744 :
745 : /* Allocate and zero buffer headers and auxiliary arrays */
746 528 : LocalBufferDescriptors = (BufferDesc *) calloc(nbufs, sizeof(BufferDesc));
747 528 : LocalBufferBlockPointers = (Block *) calloc(nbufs, sizeof(Block));
748 528 : LocalRefCount = (int32 *) calloc(nbufs, sizeof(int32));
749 528 : if (!LocalBufferDescriptors || !LocalBufferBlockPointers || !LocalRefCount)
750 0 : ereport(FATAL,
751 : (errcode(ERRCODE_OUT_OF_MEMORY),
752 : errmsg("out of memory")));
753 :
754 528 : nextFreeLocalBufId = 0;
755 :
756 : /* initialize fields that need to start off nonzero */
757 511632 : for (i = 0; i < nbufs; i++)
758 : {
759 511104 : BufferDesc *buf = GetLocalBufferDescriptor(i);
760 :
761 : /*
762 : * negative to indicate local buffer. This is tricky: shared buffers
763 : * start with 0. We have to start with -2. (Note that the routine
764 : * BufferDescriptorGetBuffer adds 1 to buf_id so our first buffer id
765 : * is -1.)
766 : */
767 511104 : buf->buf_id = -i - 2;
768 :
769 511104 : pgaio_wref_clear(&buf->io_wref);
770 :
771 : /*
772 : * Intentionally do not initialize the buffer's atomic variable
773 : * (besides zeroing the underlying memory above). That way we get
774 : * errors on platforms without atomics, if somebody (re-)introduces
775 : * atomic operations for local buffers.
776 : */
777 : }
778 :
779 : /* Create the lookup hash table */
780 528 : info.keysize = sizeof(BufferTag);
781 528 : info.entrysize = sizeof(LocalBufferLookupEnt);
782 :
783 528 : LocalBufHash = hash_create("Local Buffer Lookup Table",
784 : nbufs,
785 : &info,
786 : HASH_ELEM | HASH_BLOBS);
787 :
788 528 : if (!LocalBufHash)
789 0 : elog(ERROR, "could not initialize local buffer hash table");
790 :
791 : /* Initialization done, mark buffers allocated */
792 528 : NLocBuffer = nbufs;
793 528 : }
794 :
795 : /*
796 : * XXX: We could have a slightly more efficient version of PinLocalBuffer()
797 : * that does not support adjusting the usagecount - but so far it does not
798 : * seem worth the trouble.
799 : *
800 : * Note that ResourceOwnerEnlarge() must have been done already.
801 : */
802 : bool
803 2572290 : PinLocalBuffer(BufferDesc *buf_hdr, bool adjust_usagecount)
804 : {
805 : uint32 buf_state;
806 2572290 : Buffer buffer = BufferDescriptorGetBuffer(buf_hdr);
807 2572290 : int bufid = -buffer - 1;
808 :
809 2572290 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
810 :
811 2572290 : if (LocalRefCount[bufid] == 0)
812 : {
813 2400308 : NLocalPinnedBuffers++;
814 2400308 : buf_state += BUF_REFCOUNT_ONE;
815 2400308 : if (adjust_usagecount &&
816 2353696 : BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
817 : {
818 128836 : buf_state += BUF_USAGECOUNT_ONE;
819 : }
820 2400308 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
821 :
822 : /*
823 : * See comment in PinBuffer().
824 : *
825 : * If the buffer isn't allocated yet, it'll be marked as defined in
826 : * GetLocalBufferStorage().
827 : */
828 2400308 : if (LocalBufHdrGetBlock(buf_hdr) != NULL)
829 : VALGRIND_MAKE_MEM_DEFINED(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
830 : }
831 2572290 : LocalRefCount[bufid]++;
832 2572290 : ResourceOwnerRememberBuffer(CurrentResourceOwner,
833 : BufferDescriptorGetBuffer(buf_hdr));
834 :
835 2572290 : return buf_state & BM_VALID;
836 : }
837 :
838 : void
839 3275376 : UnpinLocalBuffer(Buffer buffer)
840 : {
841 3275376 : UnpinLocalBufferNoOwner(buffer);
842 3275376 : ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
843 3275376 : }
844 :
845 : void
846 3281346 : UnpinLocalBufferNoOwner(Buffer buffer)
847 : {
848 3281346 : int buffid = -buffer - 1;
849 :
850 : Assert(BufferIsLocal(buffer));
851 : Assert(LocalRefCount[buffid] > 0);
852 : Assert(NLocalPinnedBuffers > 0);
853 :
854 3281346 : if (--LocalRefCount[buffid] == 0)
855 : {
856 2400308 : BufferDesc *buf_hdr = GetLocalBufferDescriptor(buffid);
857 : uint32 buf_state;
858 :
859 2400308 : NLocalPinnedBuffers--;
860 :
861 2400308 : buf_state = pg_atomic_read_u32(&buf_hdr->state);
862 : Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
863 2400308 : buf_state -= BUF_REFCOUNT_ONE;
864 2400308 : pg_atomic_unlocked_write_u32(&buf_hdr->state, buf_state);
865 :
866 : /* see comment in UnpinBufferNoOwner */
867 : VALGRIND_MAKE_MEM_NOACCESS(LocalBufHdrGetBlock(buf_hdr), BLCKSZ);
868 : }
869 3281346 : }
870 :
871 : /*
872 : * GUC check_hook for temp_buffers
873 : */
874 : bool
875 2226 : check_temp_buffers(int *newval, void **extra, GucSource source)
876 : {
877 : /*
878 : * Once local buffers have been initialized, it's too late to change this.
879 : * However, if this is only a test call, allow it.
880 : */
881 2226 : if (source != PGC_S_TEST && NLocBuffer && NLocBuffer != *newval)
882 : {
883 0 : GUC_check_errdetail("\"temp_buffers\" cannot be changed after any temporary tables have been accessed in the session.");
884 0 : return false;
885 : }
886 2226 : return true;
887 : }
888 :
889 : /*
890 : * GetLocalBufferStorage - allocate memory for a local buffer
891 : *
892 : * The idea of this function is to aggregate our requests for storage
893 : * so that the memory manager doesn't see a whole lot of relatively small
894 : * requests. Since we'll never give back a local buffer once it's created
895 : * within a particular process, no point in burdening memmgr with separately
896 : * managed chunks.
897 : */
898 : static Block
899 31204 : GetLocalBufferStorage(void)
900 : {
901 : static char *cur_block = NULL;
902 : static int next_buf_in_block = 0;
903 : static int num_bufs_in_block = 0;
904 : static int total_bufs_allocated = 0;
905 : static MemoryContext LocalBufferContext = NULL;
906 :
907 : char *this_buf;
908 :
909 : Assert(total_bufs_allocated < NLocBuffer);
910 :
911 31204 : if (next_buf_in_block >= num_bufs_in_block)
912 : {
913 : /* Need to make a new request to memmgr */
914 : int num_bufs;
915 :
916 : /*
917 : * We allocate local buffers in a context of their own, so that the
918 : * space eaten for them is easily recognizable in MemoryContextStats
919 : * output. Create the context on first use.
920 : */
921 850 : if (LocalBufferContext == NULL)
922 528 : LocalBufferContext =
923 528 : AllocSetContextCreate(TopMemoryContext,
924 : "LocalBufferContext",
925 : ALLOCSET_DEFAULT_SIZES);
926 :
927 : /* Start with a 16-buffer request; subsequent ones double each time */
928 850 : num_bufs = Max(num_bufs_in_block * 2, 16);
929 : /* But not more than what we need for all remaining local bufs */
930 850 : num_bufs = Min(num_bufs, NLocBuffer - total_bufs_allocated);
931 : /* And don't overflow MaxAllocSize, either */
932 850 : num_bufs = Min(num_bufs, MaxAllocSize / BLCKSZ);
933 :
934 : /* Buffers should be I/O aligned. */
935 850 : cur_block = (char *)
936 850 : TYPEALIGN(PG_IO_ALIGN_SIZE,
937 : MemoryContextAlloc(LocalBufferContext,
938 : num_bufs * BLCKSZ + PG_IO_ALIGN_SIZE));
939 850 : next_buf_in_block = 0;
940 850 : num_bufs_in_block = num_bufs;
941 : }
942 :
943 : /* Allocate next buffer in current memory block */
944 31204 : this_buf = cur_block + next_buf_in_block * BLCKSZ;
945 31204 : next_buf_in_block++;
946 31204 : total_bufs_allocated++;
947 :
948 : /*
949 : * Caller's PinLocalBuffer() was too early for Valgrind updates, so do it
950 : * here. The block is actually undefined, but we want consistency with
951 : * the regular case of not needing to allocate memory. This is
952 : * specifically needed when method_io_uring.c fills the block, because
953 : * Valgrind doesn't recognize io_uring reads causing undefined memory to
954 : * become defined.
955 : */
956 : VALGRIND_MAKE_MEM_DEFINED(this_buf, BLCKSZ);
957 :
958 31204 : return (Block) this_buf;
959 : }
960 :
961 : /*
962 : * CheckForLocalBufferLeaks - ensure this backend holds no local buffer pins
963 : *
964 : * This is just like CheckForBufferLeaks(), but for local buffers.
965 : */
966 : static void
967 1157474 : CheckForLocalBufferLeaks(void)
968 : {
969 : #ifdef USE_ASSERT_CHECKING
970 : if (LocalRefCount)
971 : {
972 : int RefCountErrors = 0;
973 : int i;
974 :
975 : for (i = 0; i < NLocBuffer; i++)
976 : {
977 : if (LocalRefCount[i] != 0)
978 : {
979 : Buffer b = -i - 1;
980 : char *s;
981 :
982 : s = DebugPrintBufferRefcount(b);
983 : elog(WARNING, "local buffer refcount leak: %s", s);
984 : pfree(s);
985 :
986 : RefCountErrors++;
987 : }
988 : }
989 : Assert(RefCountErrors == 0);
990 : }
991 : #endif
992 1157474 : }
993 :
994 : /*
995 : * AtEOXact_LocalBuffers - clean up at end of transaction.
996 : *
997 : * This is just like AtEOXact_Buffers, but for local buffers.
998 : */
999 : void
1000 1114854 : AtEOXact_LocalBuffers(bool isCommit)
1001 : {
1002 1114854 : CheckForLocalBufferLeaks();
1003 1114854 : }
1004 :
1005 : /*
1006 : * AtProcExit_LocalBuffers - ensure we have dropped pins during backend exit.
1007 : *
1008 : * This is just like AtProcExit_Buffers, but for local buffers.
1009 : */
1010 : void
1011 42620 : AtProcExit_LocalBuffers(void)
1012 : {
1013 : /*
1014 : * We shouldn't be holding any remaining pins; if we are, and assertions
1015 : * aren't enabled, we'll fail later in DropRelationBuffers while trying to
1016 : * drop the temp rels.
1017 : */
1018 42620 : CheckForLocalBufferLeaks();
1019 42620 : }
|