summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/producerReceiver.c6
-rw-r--r--src/backend/pgxc/squeue/squeue.c44
2 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/executor/producerReceiver.c b/src/backend/executor/producerReceiver.c
index 145788a11b..8dec452b6a 100644
--- a/src/backend/executor/producerReceiver.c
+++ b/src/backend/executor/producerReceiver.c
@@ -156,11 +156,13 @@ producerDestroyReceiver(DestReceiver *self)
if (SharedQueueFinish(myState->squeue, myState->typeinfo,
myState->tstores) == 0)
{
+ elog(DEBUG3, "SharedQueueFinish returned 0 - freeing tstores");
pfree(myState->tstores);
myState->tstores = NULL;
}
else
{
+ elog(DEBUG2, "producerDestroyReceiver - sleeping for 10 seconds waiting for consumers to connect");
pg_usleep(10*1000*1000l);
/*
* Do not wait for consumers that was not even connected after 10
@@ -279,11 +281,15 @@ ProducerReceiverPushBuffers(DestReceiver *self)
if (SharedQueueFinish(myState->squeue, myState->typeinfo,
myState->tstores) == 0)
{
+ elog(DEBUG3, "SharedQueueFinish returned 0, freeing tstores");
pfree(myState->tstores);
myState->tstores = NULL;
}
else
+ {
+ elog(DEBUG3, "SharedQueueFinish returned non-zero value");
return false;
+ }
}
return true;
}
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c
index 8c97557294..5ede9d5373 100644
--- a/src/backend/pgxc/squeue/squeue.c
+++ b/src/backend/pgxc/squeue/squeue.c
@@ -265,6 +265,7 @@ SharedQueueAcquire(const char *sqname, int ncons)
{
bool found;
SharedQueue sq;
+ int trycount = 0;
Assert(IsConnFromDatanode());
Assert(ncons > 0);
@@ -394,9 +395,13 @@ tryagain:
if (old_squeue)
{
LWLockRelease(SQueuesLock);
- pg_usleep(1L);
+ pg_usleep(1000000L);
elog(DEBUG1, "SQueue race condition, give the old producer to "
"finish the work and retry again");
+ trycount++;
+ if (trycount >= 10)
+ elog(ERROR, "Couldn't resolve SQueue race condition after"
+ " %d tries", trycount);
goto tryagain;
}
@@ -1472,16 +1477,14 @@ SharedQueueRelease(const char *sqname)
sqname, sq->sq_nodeid, sq->sq_pid);
/*
- * Case if the shared queue was never bound.
- * Just remove it from the hash table.
+ * If the SharedQ is not bound, we can't just remove it because
+ * somebody might have just created a fresh entry and is going to bind
+ * to it soon. We assume that the future producer will eventually
+ * release the SharedQ
*/
if (sq->sq_nodeid == -1)
{
- sq->sq_sync = NULL;
- sqsync->queue = NULL;
- if (hash_search(SharedQueues, sqname, HASH_REMOVE, NULL) != sq)
- elog(PANIC, "Shared queue data corruption");
- elog(DEBUG1, "SQueue %s, producer not bound - released SQueue", sqname);
+ elog(DEBUG1, "SQueue %s, producer not bound ", sqname);
LWLockRelease(SQueuesLock);
return;
}
@@ -1505,7 +1508,30 @@ SharedQueueRelease(const char *sqname)
elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, "
"status %d", sq->sq_key, cstate->cs_node,
cstate->cs_pid, cstate->cs_status);
- if (cstate->cs_status != CONSUMER_DONE)
+
+ /*
+ * If the consumer pid is not set, we are looking at a race
+ * condition where the old producer (which supplied the
+ * tuples to this remote datanode) may have finished and
+ * marked all consumers as CONSUMER_EOF, the consumers
+ * themeselves consumed all the tuples and marked
+ * themselves as CONSUMER_DONE. The old producer in that
+ * case may have actually removed the SharedQ from shared
+ * memory. But if a new execution for this same portal
+ * comes before the consumer sends a "Close Portal" message
+ * (which subsequently calls this function), we may end up
+ * corrupting state for the upcoming consumer for this new
+ * execution of the portal.
+ *
+ * It seems best to just ignore the release call in such
+ * cases.
+ */
+ if (cstate->cs_pid == 0)
+ {
+ elog(DEBUG1, "SQueue %s, consumer node %d, already released",
+ sq->sq_key, cstate->cs_node);
+ }
+ else if (cstate->cs_status != CONSUMER_DONE)
{
/* Inform producer the consumer have done the job */
cstate->cs_status = CONSUMER_DONE;