diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/executor/producerReceiver.c | 6 | ||||
-rw-r--r-- | src/backend/pgxc/squeue/squeue.c | 44 |
2 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/executor/producerReceiver.c b/src/backend/executor/producerReceiver.c index 145788a11b..8dec452b6a 100644 --- a/src/backend/executor/producerReceiver.c +++ b/src/backend/executor/producerReceiver.c @@ -156,11 +156,13 @@ producerDestroyReceiver(DestReceiver *self) if (SharedQueueFinish(myState->squeue, myState->typeinfo, myState->tstores) == 0) { + elog(DEBUG3, "SharedQueueFinish returned 0 - freeing tstores"); pfree(myState->tstores); myState->tstores = NULL; } else { + elog(DEBUG2, "producerDestroyReceiver - sleeping for 10 seconds waiting for consumers to connect"); pg_usleep(10*1000*1000l); /* * Do not wait for consumers that was not even connected after 10 @@ -279,11 +281,15 @@ ProducerReceiverPushBuffers(DestReceiver *self) if (SharedQueueFinish(myState->squeue, myState->typeinfo, myState->tstores) == 0) { + elog(DEBUG3, "SharedQueueFinish returned 0, freeing tstores"); pfree(myState->tstores); myState->tstores = NULL; } else + { + elog(DEBUG3, "SharedQueueFinish returned non-zero value"); return false; + } } return true; } diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index 8c97557294..5ede9d5373 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -265,6 +265,7 @@ SharedQueueAcquire(const char *sqname, int ncons) { bool found; SharedQueue sq; + int trycount = 0; Assert(IsConnFromDatanode()); Assert(ncons > 0); @@ -394,9 +395,13 @@ tryagain: if (old_squeue) { LWLockRelease(SQueuesLock); - pg_usleep(1L); + pg_usleep(1000000L); elog(DEBUG1, "SQueue race condition, give the old producer to " "finish the work and retry again"); + trycount++; + if (trycount >= 10) + elog(ERROR, "Couldn't resolve SQueue race condition after" + " %d tries", trycount); goto tryagain; } @@ -1472,16 +1477,14 @@ SharedQueueRelease(const char *sqname) sqname, sq->sq_nodeid, sq->sq_pid); /* - * Case if the shared queue was never bound. - * Just remove it from the hash table. + * If the SharedQ is not bound, we can't just remove it because + * somebody might have just created a fresh entry and is going to bind + * to it soon. We assume that the future producer will eventually + * release the SharedQ */ if (sq->sq_nodeid == -1) { - sq->sq_sync = NULL; - sqsync->queue = NULL; - if (hash_search(SharedQueues, sqname, HASH_REMOVE, NULL) != sq) - elog(PANIC, "Shared queue data corruption"); - elog(DEBUG1, "SQueue %s, producer not bound - released SQueue", sqname); + elog(DEBUG1, "SQueue %s, producer not bound ", sqname); LWLockRelease(SQueuesLock); return; } @@ -1505,7 +1508,30 @@ SharedQueueRelease(const char *sqname) elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, " "status %d", sq->sq_key, cstate->cs_node, cstate->cs_pid, cstate->cs_status); - if (cstate->cs_status != CONSUMER_DONE) + + /* + * If the consumer pid is not set, we are looking at a race + * condition where the old producer (which supplied the + * tuples to this remote datanode) may have finished and + * marked all consumers as CONSUMER_EOF, the consumers + * themeselves consumed all the tuples and marked + * themselves as CONSUMER_DONE. The old producer in that + * case may have actually removed the SharedQ from shared + * memory. But if a new execution for this same portal + * comes before the consumer sends a "Close Portal" message + * (which subsequently calls this function), we may end up + * corrupting state for the upcoming consumer for this new + * execution of the portal. + * + * It seems best to just ignore the release call in such + * cases. + */ + if (cstate->cs_pid == 0) + { + elog(DEBUG1, "SQueue %s, consumer node %d, already released", + sq->sq_key, cstate->cs_node); + } + else if (cstate->cs_status != CONSUMER_DONE) { /* Inform producer the consumer have done the job */ cstate->cs_status = CONSUMER_DONE; |