long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
-
- /*
- * Desired buffer size to use when reading. To keep things simple, we use
- * a single-block buffer when writing, or when reading a frozen tape. But
- * when we are reading and will only read forwards, we allocate a larger
- * buffer, determined by read_buffer_size.
- */
- int read_buffer_size;
} LogicalTape;
/*
lt->lastBlockBytes = 0;
lt->buffer = NULL;
lt->buffer_size = 0;
- lt->read_buffer_size = BLCKSZ;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
}
/*
- * Rewind logical tape and switch from writing to reading or vice versa.
+ * Rewind logical tape and switch from writing to reading.
*
- * Unless the tape has been "frozen" in read state, forWrite must be the
- * opposite of the previous tape state.
+ * The tape must currently be in writing state, or "frozen" in read state.
+ *
+ * 'buffer_size' specifies how much memory to use for the read buffer. It
+ * does not include the memory needed for the indirect blocks. Regardless
+ * of the argument, the actual amount of memory used is between BLCKSZ and
+ * MaxAllocSize, and is a multiple of BLCKSZ. The given value is rounded
+ * down and truncated to fit those constraints, if necessary. If the tape
+ * is frozen, the 'buffer_size' argument is ignored, and a small BLCKSZ byte
+ * buffer is used.
*/
void
-LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
+LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
{
LogicalTape *lt;
long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = <s->tapes[tapenum];
- if (!forWrite)
+ /*
+ * Round and cap buffer_size if needed.
+ */
+ if (lt->frozen)
+ buffer_size = BLCKSZ;
+ else
{
- if (lt->writing)
- {
- /*
- * Completion of a write phase. Flush last partial data block,
- * flush any partial indirect blocks, rewind for normal
- * (destructive) read.
- */
- if (lt->dirty)
- ltsDumpBuffer(lts, lt);
- lt->lastBlockBytes = lt->nbytes;
- lt->writing = false;
- datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
- }
- else
- {
- /*
- * This is only OK if tape is frozen; we rewind for (another) read
- * pass.
- */
- Assert(lt->frozen);
- datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
- }
+ /* need at least one block */
+ if (buffer_size < BLCKSZ)
+ buffer_size = BLCKSZ;
- /* Allocate a read buffer (unless the tape is empty) */
- if (lt->buffer)
- pfree(lt->buffer);
- lt->buffer = NULL;
- lt->buffer_size = 0;
- if (datablocknum != -1L)
- {
- lt->buffer = palloc(lt->read_buffer_size);
- lt->buffer_size = lt->read_buffer_size;
- }
+ /*
+ * palloc() larger than MaxAllocSize would fail (a multi-gigabyte
+ * buffer is unlikely to be helpful, anyway)
+ */
+ if (buffer_size > MaxAllocSize)
+ buffer_size = MaxAllocSize;
- /* Read the first block, or reset if tape is empty */
- lt->curBlockNumber = 0L;
- lt->pos = 0;
- lt->nbytes = 0;
- if (datablocknum != -1L)
- ltsReadFillBuffer(lts, lt, datablocknum);
+ /* round down to BLCKSZ boundary */
+ buffer_size -= buffer_size % BLCKSZ;
+ }
+
+ if (lt->writing)
+ {
+ /*
+ * Completion of a write phase. Flush last partial data block, flush
+ * any partial indirect blocks, rewind for normal (destructive) read.
+ */
+ if (lt->dirty)
+ ltsDumpBuffer(lts, lt);
+ lt->lastBlockBytes = lt->nbytes;
+ lt->writing = false;
+ datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
/*
- * Completion of a read phase. Rewind and prepare for write.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data and indirect blocks will not have been freed. We
- * could add more code to free any unread blocks, but in current usage
- * of this module it'd be useless code.
+ * This is only OK if tape is frozen; we rewind for (another) read
+ * pass.
*/
- IndirectBlock *ib,
- *nextib;
+ Assert(lt->frozen);
+ datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
+ }
- Assert(!lt->writing && !lt->frozen);
- /* Must truncate the indirect-block hierarchy down to one level. */
- if (lt->indirect)
- {
- for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
- {
- nextib = ib->nextup;
- pfree(ib);
- }
- lt->indirect->nextSlot = 0;
- lt->indirect->nextup = NULL;
- }
- lt->writing = true;
- lt->dirty = false;
- lt->numFullBlocks = 0L;
- lt->lastBlockBytes = 0;
- lt->curBlockNumber = 0L;
- lt->pos = 0;
- lt->nbytes = 0;
+ /* Allocate a read buffer (unless the tape is empty) */
+ if (lt->buffer)
+ pfree(lt->buffer);
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
+ if (datablocknum != -1L)
+ {
+ lt->buffer = palloc(buffer_size);
+ lt->buffer_size = buffer_size;
+ }
- if (lt->buffer)
+ /* Read the first block, or reset if tape is empty */
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ if (datablocknum != -1L)
+ ltsReadFillBuffer(lts, lt, datablocknum);
+}
+
+/*
+ * Rewind logical tape and switch from reading to writing.
+ *
+ * NOTE: we assume the caller has read the tape to the end; otherwise
+ * untouched data and indirect blocks will not have been freed. We
+ * could add more code to free any unread blocks, but in current usage
+ * of this module it'd be useless code.
+ */
+void
+LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
+{
+ LogicalTape *lt;
+ IndirectBlock *ib,
+ *nextib;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = <s->tapes[tapenum];
+
+ Assert(!lt->writing && !lt->frozen);
+ /* Must truncate the indirect-block hierarchy down to one level. */
+ if (lt->indirect)
+ {
+ for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
{
- pfree(lt->buffer);
- lt->buffer = NULL;
- lt->buffer_size = 0;
+ nextib = ib->nextup;
+ pfree(ib);
}
+ lt->indirect->nextSlot = 0;
+ lt->indirect->nextup = NULL;
+ }
+ lt->writing = true;
+ lt->dirty = false;
+ lt->numFullBlocks = 0L;
+ lt->lastBlockBytes = 0;
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+
+ if (lt->buffer)
+ {
+ pfree(lt->buffer);
+ lt->buffer = NULL;
+ lt->buffer_size = 0;
}
}
{
return lts->nFileBlocks;
}
-
-/*
- * Set buffer size to use, when reading from given tape.
- */
-void
-LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
-{
- LogicalTape *lt;
-
- Assert(tapenum >= 0 && tapenum < lts->nTapes);
- lt = <s->tapes[tapenum];
-
- /*
- * The buffer size must be a multiple of BLCKSZ in size, so round the
- * given value down to nearest BLCKSZ. Make sure we have at least one
- * page. Also, don't go above MaxAllocSize, to avoid erroring out. A
- * multi-gigabyte buffer is unlikely to be helpful, anyway.
- */
- if (avail_mem < BLCKSZ)
- avail_mem = BLCKSZ;
- if (avail_mem > MaxAllocSize)
- avail_mem = MaxAllocSize;
- avail_mem -= avail_mem % BLCKSZ;
- lt->read_buffer_size = avail_mem;
-}
char *slabMemoryEnd; /* end of slab memory arena */
SlabSlot *slabFreeHead; /* head of free list */
+ /* Buffer size to use for reading input tapes, during merge. */
+ size_t read_buffer_size;
+
/*
* When we return a tuple to the caller in tuplesort_gettuple_XXX, that
* came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE
static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void init_slab_allocator(Tuplesortstate *state, int numSlots);
-static void init_tape_buffers(Tuplesortstate *state, int numInputTapes);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state);
* end of the sort anyway, but better to release the
* memory early.
*/
- LogicalTapeRewind(state->tapeset, srcTape, true);
+ LogicalTapeRewindForWrite(state->tapeset, srcTape);
return true;
}
newtup.tupindex = srcTape;
state->slabAllocatorUsed = true;
}
-/*
- * Divide all remaining work memory (availMem) as read buffers, for all
- * the tapes that will be used during the merge.
- *
- * We use the number of possible *input* tapes here, rather than maxTapes,
- * for the calculation. At all times, we'll be reading from at most
- * numInputTapes tapes, and one tape is used for output (unless we do an
- * on-the-fly final merge, in which case we don't have an output tape).
- */
-static void
-init_tape_buffers(Tuplesortstate *state, int numInputTapes)
-{
- int64 availBlocks;
- int64 blocksPerTape;
- int remainder;
- int tapenum;
-
- /*
- * Divide availMem evenly among the number of input tapes.
- */
- availBlocks = state->availMem / BLCKSZ;
- blocksPerTape = availBlocks / numInputTapes;
- remainder = availBlocks % numInputTapes;
- USEMEM(state, availBlocks * BLCKSZ);
-
-#ifdef TRACE_SORT
- if (trace_sort)
- elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
- (availBlocks * BLCKSZ) / 1024, numInputTapes);
-#endif
-
- /*
- * Use one page per tape, even if we are out of memory.
- * tuplesort_merge_order() should've chosen the number of tapes so that
- * this can't happen, but better safe than sorry. (This also protects
- * from a negative availMem.)
- */
- if (blocksPerTape < 1)
- {
- blocksPerTape = 1;
- remainder = 0;
- }
-
- /*
- * Set the buffers for the tapes.
- *
- * In a multi-phase merge, the tape that is initially used as an output
- * tape, will later be rewound and read from, and should also use a large
- * buffer at that point. So we must loop up to maxTapes, not just
- * numInputTapes!
- *
- * If there are fewer runs than tapes, we will set the buffer size also
- * for tapes that will go completely unused, but that's harmless.
- * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer
- * immediately, it just sets the size that will be used, when the tape is
- * rewound for read, and the tape isn't empty.
- */
- for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
- {
- int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0);
-
- LogicalTapeAssignReadBufferSize(state->tapeset, tapenum,
- numBlocks * BLCKSZ);
- }
-}
-
/*
* mergeruns -- merge all the completed initial runs.
*
}
/*
- * Use all the spare memory we have available for read buffers for the
- * tapes.
+ * Use all the spare memory we have available for read buffers among the
+ * input tapes.
*
* We do this only after checking for the case that we produced only one
* initial run, because there is no need to use a large read buffer when
* we're reading from a single tape. With one tape, the I/O pattern will
* be the same regardless of the buffer size.
*
- * We don't try to "rebalance" the amount of memory among tapes, when we
- * start a new merge phase, even if some tapes can be inactive in the
- * phase. That would be hard, because logtape.c doesn't know where one
- * run ends and another begins. When a new merge phase begins, and a tape
- * doesn't participate in it, its buffer nevertheless already contains
- * tuples from the next run on same tape, so we cannot release the buffer.
- * That's OK in practice, merge performance isn't that sensitive to the
- * amount of buffers used, and most merge phases use all or almost all
- * tapes, anyway.
+ * We don't try to "rebalance" the memory among tapes, when we start a new
+ * merge phase, even if some tapes are inactive in the new phase. That
+ * would be hard, because logtape.c doesn't know where one run ends and
+ * another begins. When a new merge phase begins, and a tape doesn't
+ * participate in it, its buffer nevertheless already contains tuples from
+ * the next run on same tape, so we cannot release the buffer. That's OK
+ * in practice, merge performance isn't that sensitive to the amount of
+ * buffers used, and most merge phases use all or almost all tapes,
+ * anyway.
*/
- init_tape_buffers(state, numInputTapes);
+#ifdef TRACE_SORT
+ if (trace_sort)
+ elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
+ (state->availMem) / 1024, numInputTapes);
+#endif
+
+ state->read_buffer_size = state->availMem / numInputTapes;
+ USEMEM(state, state->availMem);
/*
* Allocate a new 'memtuples' array, for the heap. It will hold one tuple
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
- LogicalTapeRewind(state->tapeset, tapenum, false);
+ LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
for (;;)
{
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
- false);
+ LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+ state->read_buffer_size);
/* rewind used-up input tape P, and prepare it for write pass */
- LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
- true);
+ LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;
/*
for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
{
if (tapenum != state->result_tape)
- LogicalTapeRewind(state->tapeset, tapenum, true);
+ LogicalTapeRewindForWrite(state->tapeset, tapenum);
}
}
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
- LogicalTapeRewind(state->tapeset,
- state->result_tape,
- false);
+ LogicalTapeRewindForRead(state->tapeset,
+ state->result_tape,
+ 0);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
void *ptr, size_t size);
extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
-extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
+extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
+ size_t buffer_size);
+extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
-extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum,
- size_t bufsize);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */