/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. Note we can't do this
- * until we're done projecting.
+ * until we're done projecting. This will also clear any previous tuple
+ * returned by a TupleQueueReader; to make sure we don't leave a dangling
+ * pointer around, clear the working slot first.
*/
+ ExecClearTuple(node->funnel_slot);
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);
PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot;
TupleTableSlot *fslot = gatherstate->funnel_slot;
+ MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup;
while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
{
if (gatherstate->reader != NULL)
{
+ MemoryContext oldContext;
+
+ /* Run TupleQueueReaders in per-tuple context */
+ oldContext = MemoryContextSwitchTo(tupleContext);
tup = gather_readnext(gatherstate);
+ MemoryContextSwitchTo(oldContext);
if (HeapTupleIsValid(tup))
{
fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this
* tuple */
- true); /* pfree this pointer if not from heap */
-
+ false); /* slot should not pfree tuple */
return fslot;
}
}
static HeapTuple
gather_readnext(GatherState *gatherstate)
{
- int waitpos = gatherstate->nextreader;
+ int nvisited = 0;
for (;;)
{
*/
if (readerdone)
{
+ Assert(!tup);
DestroyTupleQueueReader(reader);
--gatherstate->nreaders;
if (gatherstate->nreaders == 0)
ExecShutdownGatherWorkers(gatherstate);
return NULL;
}
- else
- {
- memmove(&gatherstate->reader[gatherstate->nextreader],
- &gatherstate->reader[gatherstate->nextreader + 1],
- sizeof(TupleQueueReader *)
- * (gatherstate->nreaders - gatherstate->nextreader));
- if (gatherstate->nextreader >= gatherstate->nreaders)
- gatherstate->nextreader = 0;
- if (gatherstate->nextreader < waitpos)
- --waitpos;
- }
+ memmove(&gatherstate->reader[gatherstate->nextreader],
+ &gatherstate->reader[gatherstate->nextreader + 1],
+ sizeof(TupleQueueReader *)
+ * (gatherstate->nreaders - gatherstate->nextreader));
+ if (gatherstate->nextreader >= gatherstate->nreaders)
+ gatherstate->nextreader = 0;
continue;
}
* every tuple, but it turns out to be much more efficient to keep
* reading from the same queue until that would require blocking.
*/
- gatherstate->nextreader =
- (gatherstate->nextreader + 1) % gatherstate->nreaders;
+ gatherstate->nextreader++;
+ if (gatherstate->nextreader >= gatherstate->nreaders)
+ gatherstate->nextreader = 0;
- /* Have we visited every TupleQueueReader? */
- if (gatherstate->nextreader == waitpos)
+ /* Have we visited every (surviving) TupleQueueReader? */
+ nvisited++;
+ if (nvisited >= gatherstate->nreaders)
{
/*
* If (still) running plan locally, return NULL so caller can
WaitLatch(MyLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch);
+ nvisited = 0;
}
}
}
/*
* Fetch a tuple from a tuple queue reader.
*
+ * The return value is NULL if there are no remaining tuples or if
+ * nowait = true and no tuple is ready to return. *done, if not NULL,
+ * is set to true when there are no remaining tuples and otherwise to false.
+ *
+ * The returned tuple, if any, is allocated in CurrentMemoryContext.
+ * That should be a short-lived (tuple-lifespan) context, because we are
+ * pretty cavalier about leaking memory in that context if we have to do
+ * tuple remapping.
+ *
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned.
- *
- * The return value is NULL if there are no remaining queues or if
- * nowait = true and no tuple is ready to return. *done, if not NULL,
- * is set to true when queue is detached and otherwise to false.
*/
HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
* OK, we got a message. Process it.
*
* One-byte messages are mode switch messages, so that we can switch
- * between "control" and "data" mode. When in "data" mode, each
- * message (unless exactly one byte) is a tuple. When in "control"
- * mode, each message provides a transient-typmod-to-tupledesc mapping
- * so we can interpret future tuples.
+ * between "control" and "data" mode. Otherwise, when in "data" mode,
+ * each message is a tuple. When in "control" mode, each message
+ * provides a transient-typmod-to-tupledesc mapping to let us
+ * interpret future tuples. Both of those cases certainly require
+ * more than one byte, so no confusion is possible.
*/
if (nbytes == 1)
{
#include "storage/shm_mq.h"
#include "tcop/dest.h"
+/* Opaque struct, only known inside tqueue.c. */
+typedef struct TupleQueueReader TupleQueueReader;
+
/* Use this to send tuples to a shm_mq. */
extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
/* Use these to receive tuples from a shm_mq. */
-typedef struct TupleQueueReader TupleQueueReader;
extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle,
TupleDesc tupledesc);
-extern void DestroyTupleQueueReader(TupleQueueReader *funnel);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *,
+extern void DestroyTupleQueueReader(TupleQueueReader *reader);
+extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
bool nowait, bool *done);
#endif /* TQUEUE_H */