*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/executor/nodeMaterial.c,v 1.40 2002/12/15 16:17:46 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/executor/nodeMaterial.c,v 1.41 2003/03/09 02:19:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
*/
#include "postgres.h"
+#include "access/heapam.h"
#include "executor/executor.h"
#include "executor/nodeMaterial.h"
#include "miscadmin.h"
/* ----------------------------------------------------------------
* ExecMaterial
*
- * The first time this is called, ExecMaterial retrieves tuples
- * from this node's outer subplan and inserts them into a tuplestore
- * (a temporary tuple storage structure). The first tuple is then
- * returned. Successive calls to ExecMaterial return successive
- * tuples from the tuplestore.
- *
- * Initial State:
- *
- * matstate->tuplestorestate is initially NULL, indicating we
- * haven't yet collected the results of the subplan.
+ * As long as we are at the end of the data collected in the tuplestore,
+ * we collect one new row from the subplan on each call, and stash it
+ * aside in the tuplestore before returning it. The tuplestore is
+ * only read if we are asked to scan backwards, rescan, or mark/restore.
*
* ----------------------------------------------------------------
*/
{
EState *estate;
ScanDirection dir;
+ bool forward;
Tuplestorestate *tuplestorestate;
- HeapTuple heapTuple;
+ HeapTuple heapTuple = NULL;
+ bool should_free = false;
+ bool eof_tuplestore;
TupleTableSlot *slot;
- bool should_free;
/*
* get state info from node
*/
estate = node->ss.ps.state;
dir = estate->es_direction;
+ forward = ScanDirectionIsForward(dir);
tuplestorestate = (Tuplestorestate *) node->tuplestorestate;
/*
- * If first time through, read all tuples from outer plan and pass
- * them to tuplestore.c. Subsequent calls just fetch tuples from
- * tuplestore.
+ * If first time through, initialize the tuplestore.
*/
-
if (tuplestorestate == NULL)
{
- PlanState *outerNode;
-
- /*
- * Want to scan subplan in the forward direction while creating
- * the stored data. (Does setting my direction actually affect
- * the subplan? I bet this is useless code...)
- */
- estate->es_direction = ForwardScanDirection;
-
- /*
- * Initialize tuplestore module.
- */
tuplestorestate = tuplestore_begin_heap(true, /* randomAccess */
SortMem);
node->tuplestorestate = (void *) tuplestorestate;
+ }
- /*
- * Scan the subplan and feed all the tuples to tuplestore.
- */
- outerNode = outerPlanState(node);
+ /*
+ * If we are not at the end of the tuplestore, or are going backwards,
+ * try to fetch a tuple from tuplestore.
+ */
+ eof_tuplestore = tuplestore_ateof(tuplestorestate);
- for (;;)
+ if (!forward && eof_tuplestore)
+ {
+ if (!node->eof_underlying)
{
- slot = ExecProcNode(outerNode);
+ /*
+ * When reversing direction at tuplestore EOF, the first
+ * getheaptuple call will fetch the last-added tuple; but
+ * we want to return the one before that, if possible.
+ * So do an extra fetch.
+ */
+ heapTuple = tuplestore_getheaptuple(tuplestorestate,
+ forward,
+ &should_free);
+ if (heapTuple == NULL)
+ return NULL; /* the tuplestore must be empty */
+ if (should_free)
+ heap_freetuple(heapTuple);
+ }
+ eof_tuplestore = false;
+ }
- if (TupIsNull(slot))
- break;
+ if (!eof_tuplestore)
+ {
+ heapTuple = tuplestore_getheaptuple(tuplestorestate,
+ forward,
+ &should_free);
+ if (heapTuple == NULL && forward)
+ eof_tuplestore = true;
+ }
- tuplestore_puttuple(tuplestorestate, (void *) slot->val);
- ExecClearTuple(slot);
- }
+ /*
+ * If necessary, try to fetch another row from the subplan.
+ *
+ * Note: the eof_underlying state variable exists to short-circuit
+ * further subplan calls. It's not optional, unfortunately, because
+ * some plan node types are not robust about being called again when
+ * they've already returned NULL.
+ */
+ if (eof_tuplestore && !node->eof_underlying)
+ {
+ PlanState *outerNode;
+ TupleTableSlot *outerslot;
/*
- * Complete the store.
+ * We can only get here with forward==true, so no need to worry
+ * about which direction the subplan will go.
*/
- tuplestore_donestoring(tuplestorestate);
-
+ outerNode = outerPlanState(node);
+ outerslot = ExecProcNode(outerNode);
+ if (TupIsNull(outerslot))
+ {
+ node->eof_underlying = true;
+ return NULL;
+ }
+ heapTuple = outerslot->val;
+ should_free = false;
/*
- * restore to user specified direction
+ * Append returned tuple to tuplestore, too. NOTE: because the
+ * tuplestore is certainly in EOF state, its read position will move
+ * forward over the added tuple. This is what we want.
*/
- estate->es_direction = dir;
+ tuplestore_puttuple(tuplestorestate, (void *) heapTuple);
}
/*
- * Get the first or next tuple from tuplestore. Returns NULL if no
- * more tuples.
+ * Return the obtained tuple.
*/
slot = (TupleTableSlot *) node->ss.ps.ps_ResultTupleSlot;
- heapTuple = tuplestore_getheaptuple(tuplestorestate,
- ScanDirectionIsForward(dir),
- &should_free);
-
return ExecStoreTuple(heapTuple, slot, InvalidBuffer, should_free);
}
matstate->ss.ps.state = estate;
matstate->tuplestorestate = NULL;
+ matstate->eof_underlying = false;
/*
* Miscellaneous initialization
* results; we have to re-read the subplan and re-store.
*
* Otherwise we can just rewind and rescan the stored output.
+ * The state of the subnode does not change.
*/
if (((PlanState *) node)->lefttree->chgParam != NULL)
{
tuplestore_end((Tuplestorestate *) node->tuplestorestate);
node->tuplestorestate = NULL;
+ node->eof_underlying = false;
}
else
+ {
tuplestore_rescan((Tuplestorestate *) node->tuplestorestate);
+ }
}
* This module handles temporary storage of tuples for purposes such
* as Materialize nodes, hashjoin batch files, etc. It is essentially
* a dumbed-down version of tuplesort.c; it does no sorting of tuples
- * but can only store a sequence of tuples and regurgitate it later.
+ * but can only store and regurgitate a sequence of tuples. However,
+ * because no sort is required, it is allowed to start reading the sequence
+ * before it has all been written. This is particularly useful for cursors,
+ * because it allows random access within the already-scanned portion of
+ * a query without having to process the underlying scan to completion.
* A temporary file is used to handle the data if it exceeds the
* space limit specified by the caller.
*
* The (approximate) amount of memory allowed to the tuplestore is specified
* in kilobytes by the caller. We absorb tuples and simply store them in an
- * in-memory array as long as we haven't exceeded maxKBytes. If we reach the
- * end of the input without exceeding maxKBytes, we just return tuples during
- * the read phase by scanning the tuple array sequentially. If we do exceed
+ * in-memory array as long as we haven't exceeded maxKBytes. If we do exceed
* maxKBytes, we dump all the tuples into a temp file and then read from that
- * during the read phase.
+ * when needed.
*
* When the caller requests random access to the data, we write the temp file
- * in a format that allows either forward or backward scan.
+ * in a format that allows either forward or backward scan. Otherwise, only
+ * forward scan is allowed. But rewind and markpos/restorepos are allowed
+ * in any case.
+ *
+ * Because we allow reading before writing is complete, there are two
+ * interesting positions in the temp file: the current read position and
+ * the current write position. At any given instant, the temp file's seek
+ * position corresponds to one of these, and the other one is remembered in
+ * the Tuplestore's state.
*
*
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.10 2002/11/13 00:39:48 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplestore.c,v 1.11 2003/03/09 02:19:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
*/
typedef enum
{
- TSS_INITIAL, /* Loading tuples; still within memory
- * limit */
- TSS_WRITEFILE, /* Loading tuples; writing to temp file */
- TSS_READMEM, /* Reading tuples; entirely in memory */
- TSS_READFILE /* Reading tuples from temp file */
+ TSS_INMEM, /* Tuples still fit in memory */
+ TSS_WRITEFILE, /* Writing to temp file */
+ TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
/*
* This array holds pointers to tuples in memory if we are in state
- * INITIAL or READMEM. In states WRITEFILE and READFILE it's not
- * used.
+ * INMEM. In states WRITEFILE and READFILE it's not used.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */
/*
- * These variables are used after completion of storing to keep track
- * of the next tuple to return. (In the tape case, the tape's current
- * read position is also critical state.)
+ * These variables are used to keep track of the current position.
+ *
+ * In state WRITEFILE, the current file seek position is the write point,
+ * and the read position is remembered in readpos_xxx; in state READFILE,
+ * the current file seek position is the read point, and the write position
+ * is remembered in writepos_xxx. (The write position is the same as EOF,
+ * but since BufFileSeek doesn't currently implement SEEK_END, we have
+ * to remember it explicitly.)
+ *
+ * Special case: if we are in WRITEFILE state and eof_reached is true,
+ * then the read position is implicitly equal to the write position
+ * (and hence to the file seek position); this way we need not update
+ * the readpos_xxx variables on each write.
*/
- int current; /* array index (only used if READMEM) */
- bool eof_reached; /* reached EOF (needed for cursors) */
+ bool eof_reached; /* read reached EOF (always valid) */
+ int current; /* next array index (valid if INMEM) */
+ int readpos_file; /* file# (valid if WRITEFILE and not eof) */
+ long readpos_offset; /* offset (valid if WRITEFILE and not eof) */
+ int writepos_file; /* file# (valid if READFILE) */
+ long writepos_offset; /* offset (valid if READFILE) */
/* markpos_xxx holds marked position for mark and restore */
- int markpos_file; /* file# (only used if READFILE) */
- long markpos_offset; /* saved "current", or offset in tape file */
- bool markpos_eof; /* saved "eof_reached" */
+ int markpos_current; /* saved "current" */
+ int markpos_file; /* saved "readpos_file" */
+ long markpos_offset; /* saved "readpos_offset" */
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
* NOTES about on-tape representation of tuples:
*
* We require the first "unsigned int" of a stored tuple to be the total size
- * on-tape of the tuple, including itself (so it is never zero; an all-zero
- * unsigned int is used to delimit runs). The remainder of the stored tuple
+ * on-tape of the tuple, including itself (so it is never zero).
+ * The remainder of the stored tuple
* may or may not match the in-memory representation of the tuple ---
* any conversion needed is the job of the writetup and readtup routines.
*
int maxKBytes);
static void dumptuples(Tuplestorestate *state);
static unsigned int getlen(Tuplestorestate *state, bool eofOK);
-static void markrunend(Tuplestorestate *state);
static void *copytup_heap(Tuplestorestate *state, void *tup);
static void writetup_heap(Tuplestorestate *state, void *tup);
static void *readtup_heap(Tuplestorestate *state, unsigned int len);
* tuplestore_begin_xxx
*
* Initialize for a tuple store operation.
- *
- * After calling tuplestore_begin, the caller should call tuplestore_puttuple
- * zero or more times, then call tuplestore_donestoring when all the tuples
- * have been supplied. After donestoring, retrieve the tuples in order
- * by calling tuplestore_gettuple until it returns NULL. (If random
- * access was requested, rescan, markpos, and restorepos can also be called.)
- * Call tuplestore_end to terminate the operation and release memory/disk
- * space.
*/
static Tuplestorestate *
state = (Tuplestorestate *) palloc0(sizeof(Tuplestorestate));
- state->status = TSS_INITIAL;
+ state->status = TSS_INMEM;
state->randomAccess = randomAccess;
state->availMem = maxKBytes * 1024L;
state->myfile = NULL;
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
+ state->eof_reached = false;
+ state->current = 0;
+
return state;
}
}
/*
- * Accept one tuple while collecting input data.
+ * tuplestore_ateof
+ *
+ * Returns the current eof_reached state.
+ */
+bool
+tuplestore_ateof(Tuplestorestate *state)
+{
+ return state->eof_reached;
+}
+
+/*
+ * Accept one tuple and append it to the tuplestore.
*
* Note that the input tuple is always copied; the caller need not save it.
+ *
+ * If the read status is currently "AT EOF" then it remains so (the read
+ * pointer advances along with the write pointer); otherwise the read
+ * pointer is unchanged. This is for the convenience of nodeMaterial.c.
*/
void
tuplestore_puttuple(Tuplestorestate *state, void *tuple)
switch (state->status)
{
- case TSS_INITIAL:
-
- /*
- * Stash the tuple in the in-memory array.
- */
+ case TSS_INMEM:
+ /* Grow the array as needed */
if (state->memtupcount >= state->memtupsize)
{
- /* Grow the array as needed. */
FREEMEM(state, GetMemoryChunkSpace(state->memtuples));
state->memtupsize *= 2;
state->memtuples = (void **)
state->memtupsize * sizeof(void *));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
}
+
+ /* Stash the tuple in the in-memory array */
state->memtuples[state->memtupcount++] = tuple;
+ /* If eof_reached, keep read position in sync */
+ if (state->eof_reached)
+ state->current = state->memtupcount;
+
/*
* Done if we still fit in available memory.
*/
case TSS_WRITEFILE:
WRITETUP(state, tuple);
break;
- default:
- elog(ERROR, "tuplestore_puttuple: invalid state");
- break;
- }
-}
-
-/*
- * All tuples have been provided; finish writing.
- */
-void
-tuplestore_donestoring(Tuplestorestate *state)
-{
- switch (state->status)
- {
- case TSS_INITIAL:
-
- /*
- * We were able to accumulate all the tuples within the
- * allowed amount of memory. Just set up to scan them.
- */
- state->current = 0;
- state->eof_reached = false;
- state->markpos_offset = 0L;
- state->markpos_eof = false;
- state->status = TSS_READMEM;
- break;
- case TSS_WRITEFILE:
-
- /*
- * Write the EOF marker.
- */
- markrunend(state);
-
+ case TSS_READFILE:
/*
- * Set up for reading from tape.
+ * Switch from reading to writing.
*/
- if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
- elog(ERROR, "tuplestore_donestoring: seek(0) failed");
- state->eof_reached = false;
- state->markpos_file = 0;
- state->markpos_offset = 0L;
- state->markpos_eof = false;
- state->status = TSS_READFILE;
+ if (!state->eof_reached)
+ BufFileTell(state->myfile,
+ &state->readpos_file, &state->readpos_offset);
+ if (BufFileSeek(state->myfile,
+ state->writepos_file, state->writepos_offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore_puttuple: seek(EOF) failed");
+ state->status = TSS_WRITEFILE;
+ WRITETUP(state, tuple);
break;
default:
- elog(ERROR, "tuplestore_donestoring: invalid state");
+ elog(ERROR, "tuplestore_puttuple: invalid state");
break;
}
}
unsigned int tuplen;
void *tup;
+ Assert(forward || state->randomAccess);
+
switch (state->status)
{
- case TSS_READMEM:
- Assert(forward || state->randomAccess);
+ case TSS_INMEM:
*should_free = false;
if (forward)
{
}
break;
+ case TSS_WRITEFILE:
+ /* Skip state change if we'll just return NULL */
+ if (state->eof_reached && forward)
+ return NULL;
+ /*
+ * Switch from writing to reading.
+ */
+ BufFileTell(state->myfile,
+ &state->writepos_file, &state->writepos_offset);
+ if (!state->eof_reached)
+ if (BufFileSeek(state->myfile,
+ state->readpos_file, state->readpos_offset,
+ SEEK_SET) != 0)
+ elog(ERROR, "tuplestore_gettuple: seek() failed");
+ state->status = TSS_READFILE;
+ /* FALL THRU into READFILE case */
+
case TSS_READFILE:
- Assert(forward || state->randomAccess);
*should_free = true;
if (forward)
{
- if (state->eof_reached)
- return NULL;
if ((tuplen = getlen(state, true)) != 0)
{
tup = READTUP(state, tuplen);
*
* if all tuples are fetched already then we return last tuple,
* else - tuple before last returned.
+ *
+ * Back up to fetch previously-returned tuple's ending
+ * length word. If seek fails, assume we are at start of
+ * file.
*/
+ if (BufFileSeek(state->myfile, 0, -(long) sizeof(unsigned int),
+ SEEK_CUR) != 0)
+ return NULL;
+ tuplen = getlen(state, false);
+
if (state->eof_reached)
{
- /*
- * Seek position is pointing just past the zero tuplen at
- * the end of file; back up to fetch last tuple's ending
- * length word. If seek fails we must have a completely
- * empty file.
- */
- if (BufFileSeek(state->myfile, 0,
- -(long) (2 * sizeof(unsigned int)),
- SEEK_CUR) != 0)
- return NULL;
state->eof_reached = false;
+ /* We will return the tuple returned before returning NULL */
}
else
{
- /*
- * Back up and fetch previously-returned tuple's ending
- * length word. If seek fails, assume we are at start of
- * file.
- */
- if (BufFileSeek(state->myfile, 0,
- -(long) sizeof(unsigned int),
- SEEK_CUR) != 0)
- return NULL;
- tuplen = getlen(state, false);
-
/*
* Back up to get ending length word of tuple before it.
*/
elog(ERROR, "tuplestore_gettuple: bogus tuple len in backward scan");
return NULL;
}
+ tuplen = getlen(state, false);
}
- tuplen = getlen(state, false);
-
/*
* Now we have the length of the prior tuple, back up and read
* it. Note: READTUP expects we are positioned after the
/*
* dumptuples - remove tuples from memory and write to tape
+ *
+ * As a side effect, we must set readpos and markpos to the value
+ * corresponding to "current"; otherwise, a dump would lose the current read
+ * position.
*/
static void
dumptuples(Tuplestorestate *state)
{
int i;
- for (i = 0; i < state->memtupcount; i++)
+ for (i = 0; ; i++)
+ {
+ if (i == state->current)
+ BufFileTell(state->myfile,
+ &state->readpos_file, &state->readpos_offset);
+ if (i == state->markpos_current)
+ BufFileTell(state->myfile,
+ &state->markpos_file, &state->markpos_offset);
+ if (i >= state->memtupcount)
+ break;
WRITETUP(state, state->memtuples[i]);
+ }
state->memtupcount = 0;
}
void
tuplestore_rescan(Tuplestorestate *state)
{
- Assert(state->randomAccess);
-
switch (state->status)
{
- case TSS_READMEM:
+ case TSS_INMEM:
+ state->eof_reached = false;
state->current = 0;
+ break;
+ case TSS_WRITEFILE:
state->eof_reached = false;
- state->markpos_offset = 0L;
- state->markpos_eof = false;
+ state->readpos_file = 0;
+ state->readpos_offset = 0L;
break;
case TSS_READFILE:
+ state->eof_reached = false;
if (BufFileSeek(state->myfile, 0, 0L, SEEK_SET) != 0)
elog(ERROR, "tuplestore_rescan: seek(0) failed");
- state->eof_reached = false;
- state->markpos_file = 0;
- state->markpos_offset = 0L;
- state->markpos_eof = false;
break;
default:
elog(ERROR, "tuplestore_rescan: invalid state");
void
tuplestore_markpos(Tuplestorestate *state)
{
- Assert(state->randomAccess);
-
switch (state->status)
{
- case TSS_READMEM:
- state->markpos_offset = state->current;
- state->markpos_eof = state->eof_reached;
+ case TSS_INMEM:
+ state->markpos_current = state->current;
+ break;
+ case TSS_WRITEFILE:
+ if (state->eof_reached)
+ {
+ /* Need to record the implicit read position */
+ BufFileTell(state->myfile,
+ &state->markpos_file,
+ &state->markpos_offset);
+ }
+ else
+ {
+ state->markpos_file = state->readpos_file;
+ state->markpos_offset = state->readpos_offset;
+ }
break;
case TSS_READFILE:
BufFileTell(state->myfile,
&state->markpos_file,
&state->markpos_offset);
- state->markpos_eof = state->eof_reached;
break;
default:
elog(ERROR, "tuplestore_markpos: invalid state");
void
tuplestore_restorepos(Tuplestorestate *state)
{
- Assert(state->randomAccess);
-
switch (state->status)
{
- case TSS_READMEM:
- state->current = (int) state->markpos_offset;
- state->eof_reached = state->markpos_eof;
+ case TSS_INMEM:
+ state->eof_reached = false;
+ state->current = state->markpos_current;
+ break;
+ case TSS_WRITEFILE:
+ state->eof_reached = false;
+ state->readpos_file = state->markpos_file;
+ state->readpos_offset = state->markpos_offset;
break;
case TSS_READFILE:
+ state->eof_reached = false;
if (BufFileSeek(state->myfile,
state->markpos_file,
state->markpos_offset,
SEEK_SET) != 0)
elog(ERROR, "tuplestore_restorepos failed");
- state->eof_reached = state->markpos_eof;
break;
default:
elog(ERROR, "tuplestore_restorepos: invalid state");
getlen(Tuplestorestate *state, bool eofOK)
{
unsigned int len;
+ size_t nbytes;
- if (BufFileRead(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
+ nbytes = BufFileRead(state->myfile, (void *) &len, sizeof(len));
+ if (nbytes == sizeof(len))
+ return len;
+ if (nbytes != 0)
elog(ERROR, "tuplestore: unexpected end of tape");
- if (len == 0 && !eofOK)
+ if (!eofOK)
elog(ERROR, "tuplestore: unexpected end of data");
- return len;
-}
-
-static void
-markrunend(Tuplestorestate *state)
-{
- unsigned int len = 0;
-
- if (BufFileWrite(state->myfile, (void *) &len, sizeof(len)) != sizeof(len))
- elog(ERROR, "tuplestore: write failed");
+ return 0;
}