Revert "Fix race in Parallel Hash Join batch cleanup."
authorThomas Munro <tmunro@postgresql.org>
Wed, 17 Mar 2021 12:09:35 +0000 (01:09 +1300)
committerThomas Munro <tmunro@postgresql.org>
Wed, 17 Mar 2021 12:09:35 +0000 (01:09 +1300)
This reverts commit 0129c56fbe5c26bfec91bfc2c8a3b8818f441d6e.

Discussion: https://postgr.es/m/CA%2BhUKGJmcqAE3MZeDCLLXa62cWM0AJbKmp2JrJYaJ86bz36LFA%40mail.gmail.com

src/backend/executor/nodeHash.c
src/backend/executor/nodeHashjoin.c
src/include/executor/hashjoin.h

index a061c25a7943a1bc5e2e958a327280808cb227c8..982270dd72abe49653387bc0148e459b208c312a 100644 (file)
@@ -331,21 +331,14 @@ MultiExecParallelHash(HashState *node)
    hashtable->nbuckets = pstate->nbuckets;
    hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
    hashtable->totalTuples = pstate->total_tuples;
-
-   /*
-    * Unless we're completely done and the batch state has been freed, make
-    * sure we have accessors.
-    */
-   if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
-       ExecParallelHashEnsureBatchAccessors(hashtable);
+   ExecParallelHashEnsureBatchAccessors(hashtable);
 
    /*
     * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
-    * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it isn't
+    * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
     * there already).
     */
    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
-          BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
           BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 }
 
@@ -625,7 +618,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
        /*
         * The next Parallel Hash synchronization point is in
         * MultiExecParallelHash(), which will progress it all the way to
-        * PHJ_BUILD_RUNNING.  The caller must not return control from this
+        * PHJ_BUILD_DONE.  The caller must not return control from this
         * executor node between now and then.
         */
    }
@@ -3008,11 +3001,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
    }
 
    /*
-    * We should never see a state where the batch-tracking array is freed,
-    * because we should have given up sooner if we join when the build barrier
-    * has reached the PHJ_BUILD_DONE phase.
+    * It's possible for a backend to start up very late so that the whole
+    * join is finished and the shm state for tracking batches has already
+    * been freed by ExecHashTableDetach().  In that case we'll just leave
+    * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
+    * up early.
     */
-   Assert(DsaPointerIsValid(pstate->batches));
+   if (!DsaPointerIsValid(pstate->batches))
+       return;
 
    /* Use hash join memory context. */
    oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -3132,17 +3128,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
 void
 ExecHashTableDetach(HashJoinTable hashtable)
 {
-   ParallelHashJoinState *pstate = hashtable->parallel_state;
-
-   /*
-    * If we're involved in a parallel query, we must either have got all the
-    * way to PHJ_BUILD_RUNNING, or joined too late and be in PHJ_BUILD_DONE.
-    */
-   Assert(!pstate ||
-          BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
-
-   if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
+   if (hashtable->parallel_state)
    {
+       ParallelHashJoinState *pstate = hashtable->parallel_state;
        int         i;
 
        /* Make sure any temporary files are closed. */
@@ -3158,22 +3146,17 @@ ExecHashTableDetach(HashJoinTable hashtable)
        }
 
        /* If we're last to detach, clean up shared memory. */
-       if (BarrierArriveAndDetach(&pstate->build_barrier))
+       if (BarrierDetach(&pstate->build_barrier))
        {
-           /*
-            * Late joining processes will see this state and give up
-            * immediately.
-            */
-           Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
-
            if (DsaPointerIsValid(pstate->batches))
            {
                dsa_free(hashtable->area, pstate->batches);
                pstate->batches = InvalidDsaPointer;
            }
        }
+
+       hashtable->parallel_state = NULL;
    }
-   hashtable->parallel_state = NULL;
 }
 
 /*
index 338243c4d112bacc89feeee9e8e919180816aad0..1d336afd80401ee92f261354dd7ed612a0a5c737 100644 (file)
@@ -45,8 +45,7 @@
  *   PHJ_BUILD_ALLOCATING            -- one sets up the batches and table 0
  *   PHJ_BUILD_HASHING_INNER         -- all hash the inner rel
  *   PHJ_BUILD_HASHING_OUTER         -- (multi-batch only) all hash the outer
- *   PHJ_BUILD_RUNNING               -- building done, probing can begin
- *   PHJ_BUILD_DONE                  -- all work complete, one frees batches
+ *   PHJ_BUILD_DONE                  -- building done, probing can begin
  *
  * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
  * be used repeatedly as required to coordinate expansions in the number of
@@ -74,7 +73,7 @@
  * batches whenever it encounters them while scanning and probing, which it
  * can do because it processes batches in serial order.
  *
- * Once PHJ_BUILD_RUNNING is reached, backends then split up and process
+ * Once PHJ_BUILD_DONE is reached, backends then split up and process
  * different batches, or gang up and work together on probing batches if there
  * aren't enough to go around.  For each batch there is a separate barrier
  * with the following phases:
  *
  * To avoid deadlocks, we never wait for any barrier unless it is known that
  * all other backends attached to it are actively executing the node or have
- * finished.  Practically, that means that we never emit a tuple while attached
- * to a barrier, unless the barrier has reached a phase that means that no
- * process will wait on it again.  We emit tuples while attached to the build
- * barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
- * PHJ_BATCH_PROBING.  These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
- * respectively without waiting, using BarrierArriveAndDetach().  The last to
- * detach receives a different return value so that it knows that it's safe to
- * clean up.  Any straggler process that attaches after that phase is reached
- * will see that it's too late to participate or access the relevant shared
- * memory objects.
+ * already arrived.  Practically, that means that we never return a tuple
+ * while attached to a barrier, unless the barrier has reached its final
+ * state.  In the slightly special case of the per-batch barrier, we return
+ * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
+ * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
  *
  *-------------------------------------------------------------------------
  */
@@ -322,7 +316,6 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 
                    build_barrier = &parallel_state->build_barrier;
                    Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
-                          BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
                           BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
                    if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
                    {
@@ -335,18 +328,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                        BarrierArriveAndWait(build_barrier,
                                             WAIT_EVENT_HASH_BUILD_HASHING_OUTER);
                    }
-                   else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
-                   {
-                       /*
-                        * If we attached so late that the job is finished and
-                        * the batch state has been freed, we can return
-                        * immediately.
-                        */
-                       return NULL;
-                   }
+                   Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
 
                    /* Each backend should now select a batch to work on. */
-                   Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
                    hashtable->curbatch = -1;
                    node->hj_JoinState = HJ_NEED_NEW_BATCH;
 
@@ -1119,6 +1103,14 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
    int         start_batchno;
    int         batchno;
 
+   /*
+    * If we started up so late that the batch tracking array has been freed
+    * already by ExecHashTableDetach(), then we are finished.  See also
+    * ExecParallelHashEnsureBatchAccessors().
+    */
+   if (hashtable->batches == NULL)
+       return false;
+
    /*
     * If we were already attached to a batch, remember not to bother checking
     * it again, and detach from it (possibly freeing the hash table if we are
index e000b0efb3756b8adcd00a8ef22986cf6abee56c..a9f9872a78c99a658b443671d0967cb8168ef7b4 100644 (file)
@@ -258,8 +258,7 @@ typedef struct ParallelHashJoinState
 #define PHJ_BUILD_ALLOCATING           1
 #define PHJ_BUILD_HASHING_INNER            2
 #define PHJ_BUILD_HASHING_OUTER            3
-#define PHJ_BUILD_RUNNING              4
-#define PHJ_BUILD_DONE                 5
+#define PHJ_BUILD_DONE                 4
 
 /* The phases for probing each batch, used by for batch_barrier. */
 #define PHJ_BATCH_ELECTING             0