Fix worst memory leaks in tqueue.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 29 Jul 2016 23:31:06 +0000 (19:31 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 29 Jul 2016 23:31:06 +0000 (19:31 -0400)
TupleQueueReaderNext() leaks like a sieve if it has to do any tuple
disassembly/reconstruction.  While we could try to clean up its allocations
piecemeal, it seems like a better idea just to insist that it should be run
in a short-lived memory context, so that any transient space goes away
automatically.  I chose to have nodeGather.c switch into its existing
per-tuple context before the call, rather than inventing a separate
context inside tqueue.c.

This is sufficient to stop all leakage in the simple case I exhibited
earlier today (see link below), but it does not deal with leaks induced
in more complex cases by tqueue.c's insistence on using TopMemoryContext
for data that it's not actually trying hard to keep track of.  That issue
is intertwined with another major source of inefficiency, namely failure
to cache lookup results across calls, so it seems best to deal with it
separately.

In passing, improve some comments, and modify gather_readnext's method for
deciding when it's visited all the readers so that it's more obviously
correct.  (I'm not actually convinced that the previous code *is*
correct in the case of a reader deletion; it certainly seems fragile.)

Discussion: <32763.1469821037@sss.pgh.pa.us>

src/backend/executor/nodeGather.c
src/backend/executor/tqueue.c
src/include/executor/tqueue.h

index 313b234454023c1c24bd92e8026a1b9f041c0c23..93a566ba629dbb2510344ca5148f0c695751b22d 100644 (file)
@@ -214,8 +214,11 @@ ExecGather(GatherState *node)
        /*
         * Reset per-tuple memory context to free any expression evaluation
         * storage allocated in the previous tuple cycle.  Note we can't do this
-        * until we're done projecting.
+        * until we're done projecting.  This will also clear any previous tuple
+        * returned by a TupleQueueReader; to make sure we don't leave a dangling
+        * pointer around, clear the working slot first.
         */
+       ExecClearTuple(node->funnel_slot);
        econtext = node->ps.ps_ExprContext;
        ResetExprContext(econtext);
 
@@ -274,13 +277,19 @@ gather_getnext(GatherState *gatherstate)
        PlanState  *outerPlan = outerPlanState(gatherstate);
        TupleTableSlot *outerTupleSlot;
        TupleTableSlot *fslot = gatherstate->funnel_slot;
+       MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
        HeapTuple       tup;
 
        while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
        {
                if (gatherstate->reader != NULL)
                {
+                       MemoryContext oldContext;
+
+                       /* Run TupleQueueReaders in per-tuple context */
+                       oldContext = MemoryContextSwitchTo(tupleContext);
                        tup = gather_readnext(gatherstate);
+                       MemoryContextSwitchTo(oldContext);
 
                        if (HeapTupleIsValid(tup))
                        {
@@ -288,8 +297,7 @@ gather_getnext(GatherState *gatherstate)
                                                           fslot,       /* slot in which to store the tuple */
                                                           InvalidBuffer,       /* buffer associated with this
                                                                                                 * tuple */
-                                                          true);       /* pfree this pointer if not from heap */
-
+                                                          false);      /* slot should not pfree tuple */
                                return fslot;
                        }
                }
@@ -314,7 +322,7 @@ gather_getnext(GatherState *gatherstate)
 static HeapTuple
 gather_readnext(GatherState *gatherstate)
 {
-       int                     waitpos = gatherstate->nextreader;
+       int                     nvisited = 0;
 
        for (;;)
        {
@@ -335,6 +343,7 @@ gather_readnext(GatherState *gatherstate)
                 */
                if (readerdone)
                {
+                       Assert(!tup);
                        DestroyTupleQueueReader(reader);
                        --gatherstate->nreaders;
                        if (gatherstate->nreaders == 0)
@@ -342,17 +351,12 @@ gather_readnext(GatherState *gatherstate)
                                ExecShutdownGatherWorkers(gatherstate);
                                return NULL;
                        }
-                       else
-                       {
-                               memmove(&gatherstate->reader[gatherstate->nextreader],
-                                               &gatherstate->reader[gatherstate->nextreader + 1],
-                                               sizeof(TupleQueueReader *)
-                                               * (gatherstate->nreaders - gatherstate->nextreader));
-                               if (gatherstate->nextreader >= gatherstate->nreaders)
-                                       gatherstate->nextreader = 0;
-                               if (gatherstate->nextreader < waitpos)
-                                       --waitpos;
-                       }
+                       memmove(&gatherstate->reader[gatherstate->nextreader],
+                                       &gatherstate->reader[gatherstate->nextreader + 1],
+                                       sizeof(TupleQueueReader *)
+                                       * (gatherstate->nreaders - gatherstate->nextreader));
+                       if (gatherstate->nextreader >= gatherstate->nreaders)
+                               gatherstate->nextreader = 0;
                        continue;
                }
 
@@ -367,11 +371,13 @@ gather_readnext(GatherState *gatherstate)
                 * every tuple, but it turns out to be much more efficient to keep
                 * reading from the same queue until that would require blocking.
                 */
-               gatherstate->nextreader =
-                       (gatherstate->nextreader + 1) % gatherstate->nreaders;
+               gatherstate->nextreader++;
+               if (gatherstate->nextreader >= gatherstate->nreaders)
+                       gatherstate->nextreader = 0;
 
-               /* Have we visited every TupleQueueReader? */
-               if (gatherstate->nextreader == waitpos)
+               /* Have we visited every (surviving) TupleQueueReader? */
+               nvisited++;
+               if (nvisited >= gatherstate->nreaders)
                {
                        /*
                         * If (still) running plan locally, return NULL so caller can
@@ -384,6 +390,7 @@ gather_readnext(GatherState *gatherstate)
                        WaitLatch(MyLatch, WL_LATCH_SET, 0);
                        CHECK_FOR_INTERRUPTS();
                        ResetLatch(MyLatch);
+                       nvisited = 0;
                }
        }
 }
index e81c333e4cd90cc4f38f45805034ec26179b0cc6..64555599ceeb83c7aec43143d5846c0a3ae4d1cc 100644 (file)
@@ -524,13 +524,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader)
 /*
  * Fetch a tuple from a tuple queue reader.
  *
+ * The return value is NULL if there are no remaining tuples or if
+ * nowait = true and no tuple is ready to return.  *done, if not NULL,
+ * is set to true when there are no remaining tuples and otherwise to false.
+ *
+ * The returned tuple, if any, is allocated in CurrentMemoryContext.
+ * That should be a short-lived (tuple-lifespan) context, because we are
+ * pretty cavalier about leaking memory in that context if we have to do
+ * tuple remapping.
+ *
  * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
  * accumulate bytes from a partially-read message, so it's useful to call
  * this with nowait = true even if nothing is returned.
- *
- * The return value is NULL if there are no remaining queues or if
- * nowait = true and no tuple is ready to return.  *done, if not NULL,
- * is set to true when queue is detached and otherwise to false.
  */
 HeapTuple
 TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
@@ -565,10 +570,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
                 * OK, we got a message.  Process it.
                 *
                 * One-byte messages are mode switch messages, so that we can switch
-                * between "control" and "data" mode.  When in "data" mode, each
-                * message (unless exactly one byte) is a tuple.  When in "control"
-                * mode, each message provides a transient-typmod-to-tupledesc mapping
-                * so we can interpret future tuples.
+                * between "control" and "data" mode.  Otherwise, when in "data" mode,
+                * each message is a tuple.  When in "control" mode, each message
+                * provides a transient-typmod-to-tupledesc mapping to let us
+                * interpret future tuples.  Both of those cases certainly require
+                * more than one byte, so no confusion is possible.
                 */
                if (nbytes == 1)
                {
index 4f23c00feb144c6e7a048ccf3ca0f5feaabbc83e..3a0aba162d400d06c449c35bedaa528aa77e7034 100644 (file)
 #include "storage/shm_mq.h"
 #include "tcop/dest.h"
 
+/* Opaque struct, only known inside tqueue.c. */
+typedef struct TupleQueueReader TupleQueueReader;
+
 /* Use this to send tuples to a shm_mq. */
 extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
 
 /* Use these to receive tuples from a shm_mq. */
-typedef struct TupleQueueReader TupleQueueReader;
 extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle,
                                           TupleDesc tupledesc);
-extern void DestroyTupleQueueReader(TupleQueueReader *funnel);
-extern HeapTuple TupleQueueReaderNext(TupleQueueReader *,
+extern void DestroyTupleQueueReader(TupleQueueReader *reader);
+extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader,
                                         bool nowait, bool *done);
 
 #endif   /* TQUEUE_H */