diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/commands/prepare.c | 1 | ||||
-rw-r--r-- | src/backend/executor/producerReceiver.c | 21 | ||||
-rw-r--r-- | src/backend/pgxc/squeue/squeue.c | 229 | ||||
-rw-r--r-- | src/include/pgxc/squeue.h | 2 |
4 files changed, 221 insertions, 32 deletions
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 19d9f6cf0f..d69b60259e 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -735,6 +735,7 @@ DropPreparedStatement(const char *stmt_name, bool showError) if (entry->use_resowner) ResourceOwnerForgetPreparedStmt(CurTransactionResourceOwner, entry->stmt_name); + SharedQueueDisconnectConsumer(entry->stmt_name); #endif } } diff --git a/src/backend/executor/producerReceiver.c b/src/backend/executor/producerReceiver.c index d3f3bc8968..62a8657d02 100644 --- a/src/backend/executor/producerReceiver.c +++ b/src/backend/executor/producerReceiver.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "executor/producerReceiver.h" #include "pgxc/nodemgr.h" @@ -157,6 +158,8 @@ producerDestroyReceiver(DestReceiver *self) /* Make sure all data are in the squeue */ while (myState->tstores) { + CHECK_FOR_INTERRUPTS(); + if (SharedQueueFinish(myState->squeue, myState->typeinfo, myState->tstores) == 0) { @@ -166,15 +169,15 @@ producerDestroyReceiver(DestReceiver *self) } else { - elog(DEBUG2, "producerDestroyReceiver - sleeping for 10 seconds waiting for consumers to connect"); - pg_usleep(10*1000*1000l); - /* - * Do not wait for consumers that was not even connected after 10 - * seconds after start waiting for their disconnection. - * That should help to break the loop which would otherwise endless. - * The error will be emitted later in SharedQueueUnBind - */ - SharedQueueResetNotConnected(myState->squeue); + if (SharedQueueWaitOnProducerLatch(myState->squeue, 10000L)) + /* + * Do not wait for consumers that was not even connected after + * 10 seconds after start waiting for their disconnection. + * That should help to break the loop which would otherwise + * endless. The error will be emitted later in + * SharedQueueUnBind + */ + SharedQueueResetNotConnected(myState->squeue); } } diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index a9741f33f6..2f782b92e8 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -57,6 +57,7 @@ typedef struct ConsumerSync typedef struct SQueueSync { void *queue; /* NULL if not assigned to any queue */ + LWLock *sqs_producer_lwlock; /* Synchronize access to the queue */ Latch sqs_producer_latch; /* the latch producer is waiting on */ ConsumerSync sqs_consumer_sync[0]; /* actual length is MaxDataNodes-1 is * not known on compile time */ @@ -186,7 +187,7 @@ static void *SQueueSyncs; static bool sq_push_long_tuple(ConsState *cstate, RemoteDataRow datarow); static void sq_pull_long_tuple(ConsState *cstate, RemoteDataRow datarow, - ConsumerSync *sync); + int consumerIdx, SQueueSync *sqsync); /* * SharedQueuesInit @@ -224,7 +225,10 @@ SharedQueuesInit(void) if (!found) { int i, l; - int nlocks = (NUM_SQUEUES * (MaxDataNodes-1)); + int nlocks = (NUM_SQUEUES * (MaxDataNodes)); /* + * (MaxDataNodes - 1) + * consumers + 1 producer + */ bool foundLocks; /* Initialize LWLocks for queues */ @@ -244,7 +248,10 @@ SharedQueuesInit(void) int j; sqs->queue = NULL; + LWLockInitialize(&(SQueueLocks[l]).lock, LWTRANCHE_SHARED_QUEUES); + sqs->sqs_producer_lwlock = &(SQueueLocks[l++]).lock; InitSharedLatch(&sqs->sqs_producer_latch); + for (j = 0; j < MaxDataNodes-1; j++) { InitSharedLatch(&sqs->sqs_consumer_sync[j].cs_latch); @@ -289,6 +296,13 @@ SharedQueueAcquire(const char *sqname, int ncons) tryagain: LWLockAcquire(SQueuesLock, LW_EXCLUSIVE); + /* + * Setup PGXC_PARENT_NODE_ID right now to ensure that the cleanup happens + * correctly even if the consumer never really binds to the shared queue. + */ + PGXC_PARENT_NODE_ID = PGXCNodeGetNodeIdFromName(PGXC_PARENT_NODE, + &PGXC_PARENT_NODE_TYPE); + sq = (SharedQueue) hash_search(SharedQueues, sqname, HASH_ENTER, &found); if (!sq) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), @@ -389,9 +403,6 @@ tryagain: { int i; bool old_squeue = true; - - PGXC_PARENT_NODE_ID = PGXCNodeGetNodeIdFromName(PGXC_PARENT_NODE, - &PGXC_PARENT_NODE_TYPE); for (i = 0; i < sq->sq_nconsumers; i++) { ConsState *cstate = &(sq->sq_consumers[i]); @@ -457,8 +468,24 @@ SharedQueueBind(const char *sqname, List *consNodes, PGXC_PARENT_NODE_ID = PGXCNodeGetNodeIdFromName(PGXC_PARENT_NODE, &PGXC_PARENT_NODE_TYPE); sq = (SharedQueue) hash_search(SharedQueues, sqname, HASH_FIND, &found); + + /* + * It's not clear but it seems that if the producer fails even before a + * consumer binds to the shared queue, the producer may remove the shared + * queue (or would refcount mechanism fully protect us against that?). So + * instead of panicing, just throw a soft error. + */ if (!found) - elog(PANIC, "Shared queue %s not found", sqname); + elog(ERROR, "Shared queue %s not found", sqname); + + /* + * Now acquire the queue-specific lock and then release the top level lock. + * We must follow a strict ordering between SQueuesLock, + * sqs_producer_lwlock and the consumer cs_lwlock to avoid a deadlock. + */ + LWLockAcquire(sq->sq_sync->sqs_producer_lwlock, LW_EXCLUSIVE); + LWLockRelease(SQueuesLock); + if (sq->sq_pid == 0) { /* Producer */ @@ -624,7 +651,7 @@ SharedQueueBind(const char *sqname, List *consNodes, /* Producer may be waiting for status change */ SetLatch(&sqsync->sqs_producer_latch); LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); - LWLockRelease(SQueuesLock); + LWLockRelease(sqsync->sqs_producer_lwlock); ereport(ERROR, (errcode(ERRCODE_PRODUCER_ERROR), errmsg("Producer failed while we were waiting - status was %d", status))); @@ -661,7 +688,7 @@ SharedQueueBind(const char *sqname, List *consNodes, Assert(*myindex != -1); Assert(sq->sq_nconsumers == nconsumers); } - LWLockRelease(SQueuesLock); + LWLockRelease(sq->sq_sync->sqs_producer_lwlock); return sq; } @@ -948,6 +975,18 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, Assert(cstate->cs_qlength > 0); + /* + * If we run out of produced data while reading, we would like to wake up + * and tell the producer to produce more. But in order to ensure that the + * producer does not miss the signal, we must obtain sufficient lock on the + * queue. In order to allow multiple consumers to read from their + * respective queues at the same time, we obtain a SHARED lock on the + * queue. But the producer must obtain an EXCLUSIVE lock to ensure it does + * not miss the signal. + * + * Again, important to follow strict lock ordering. + */ + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_SHARED); LWLockAcquire(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock, LW_EXCLUSIVE); Assert(cstate->cs_status != CONSUMER_DONE); @@ -976,6 +1015,7 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, * are finishing */ SetLatch(&sqsync->sqs_producer_latch); + LWLockRelease(sqsync->sqs_producer_lwlock); return true; } else if (cstate->cs_status == CONSUMER_ERROR) @@ -989,6 +1029,8 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, * Release all the locks and report problem to the caller. */ LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock); + LWLockRelease(sqsync->sqs_producer_lwlock); + /* * Reporting error will cause transaction rollback and clean up of * all portals. We can not mark the portal so it does not access @@ -1015,16 +1057,23 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, squeue->sq_key, cstate->cs_node, cstate->cs_pid, cstate->cs_status); + /* Inform the producer to produce more while we wait for it */ + SetLatch(&sqsync->sqs_producer_latch); + LWLockRelease(sqsync->sqs_producer_lwlock); + /* Wait for notification about available info */ WaitLatch(&sqsync->sqs_consumer_sync[consumerIdx].cs_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, WAIT_EVENT_MQ_INTERNAL); + /* got the notification, restore lock and try again */ + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_SHARED); LWLockAcquire(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock, LW_EXCLUSIVE); } else { LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock); + LWLockRelease(sqsync->sqs_producer_lwlock); elog(DEBUG3, "SQueue %s, consumer (node %d, pid %d, status %d) - " "no queued tuples to read, caller can't wait ", @@ -1047,8 +1096,7 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, datarow->msgnode = InvalidOid; datarow->msglen = datalen; if (datalen > cstate->cs_qlength - sizeof(int)) - sq_pull_long_tuple(cstate, datarow, - &sqsync->sqs_consumer_sync[consumerIdx]); + sq_pull_long_tuple(cstate, datarow, consumerIdx, sqsync); else QUEUE_READ(cstate, datalen, datarow->msg); ExecStoreDataRowTuple(datarow, slot, true); @@ -1059,6 +1107,7 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx, /* sanity check */ Assert((cstate->cs_ntuples == 0) == (cstate->cs_qreadpos == cstate->cs_qwritepos)); LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock); + LWLockRelease(sqsync->sqs_producer_lwlock); return false; } @@ -1083,6 +1132,8 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx) if (!sqsync) return; + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_SHARED); + if (consumerIdx == -1) { int i; @@ -1120,6 +1171,9 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx) /* wake up consumer if it is sleeping */ SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); + + /* Tell producer about change in the state */ + SetLatch(&sqsync->sqs_producer_latch); } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } @@ -1160,10 +1214,77 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx) LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock); } + LWLockRelease(sqsync->sqs_producer_lwlock); } /* + * Disconnect a remote consumer for the given shared queue. + * + * A node may not join a shared queue in certain circumstances such as when the + * other side of the join has not produced any rows and the RemoteSubplan is + * not at all executed on the node. Even in that case, we should receive a + * 'statement close' message from the remote node and mark that specific + * consumer as DONE. + */ +void +SharedQueueDisconnectConsumer(const char *sqname) +{ + bool found; + SharedQueue squeue; + int i; + SQueueSync *sqsync; + + /* + * Be prepared to be called even when there are no shared queues setup. + */ + if (!SharedQueues) + return; + + LWLockAcquire(SQueuesLock, LW_EXCLUSIVE); + + squeue = (SharedQueue) hash_search(SharedQueues, sqname, HASH_FIND, &found); + if (!found || squeue->sq_pid == 0) + { + /* + * If the shared queue with the given name is not found or if the + * producer has not yet bound, nothing is done. + * + * XXX Is it possible that the producer binds after this remote + * consumer has closed the statement? If that happens, the prodcuer + * will not know that this consumer is not going to connect. We + * need to study this further and make adjustments if necessary. + */ + LWLockRelease(SQueuesLock); + return; + } + + sqsync = squeue->sq_sync; + + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_EXCLUSIVE); + LWLockRelease(SQueuesLock); + + /* check queue states */ + for (i = 0; i < squeue->sq_nconsumers; i++) + { + ConsState *cstate = &squeue->sq_consumers[i]; + LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE); + + if (cstate->cs_node == PGXC_PARENT_NODE_ID) + { + cstate->cs_status = CONSUMER_DONE; + /* discard tuples which may already be in the queue */ + cstate->cs_ntuples = 0; + /* keep consistent with cs_ntuples*/ + cstate->cs_qreadpos = cstate->cs_qwritepos = 0; + } + LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); + } + SetLatch(&sqsync->sqs_producer_latch); + LWLockRelease(sqsync->sqs_producer_lwlock); +} + +/* * Assume that not yet connected consumers won't connect and reset them. * That should allow to Finish/UnBind the queue gracefully and prevent * producer hanging. @@ -1178,6 +1299,8 @@ SharedQueueResetNotConnected(SharedQueue squeue) elog(DEBUG1, "SQueue %s, resetting all unconnected consumers", squeue->sq_key); + LWLockAcquire(squeue->sq_sync->sqs_producer_lwlock, LW_EXCLUSIVE); + /* check queue states */ for (i = 0; i < squeue->sq_nconsumers; i++) { @@ -1185,14 +1308,13 @@ SharedQueueResetNotConnected(SharedQueue squeue) LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE); if (cstate->cs_pid == 0 && - cstate->cs_status != CONSUMER_EOF && cstate->cs_status != CONSUMER_DONE) { result++; elog(DEBUG1, "SQueue %s, consumer at %d, consumer node %d, pid %d, " "status %d is cancelled - marking CONSUMER_ERROR", squeue->sq_key, i, cstate->cs_node, cstate->cs_pid, cstate->cs_status); - cstate->cs_status = CONSUMER_ERROR; + cstate->cs_status = CONSUMER_DONE; /* discard tuples which may already be in the queue */ cstate->cs_ntuples = 0; /* keep consistent with cs_ntuples*/ @@ -1203,8 +1325,24 @@ SharedQueueResetNotConnected(SharedQueue squeue) } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } + + LWLockRelease(sqsync->sqs_producer_lwlock); } +/* + * Wait on the producer latch, for timeout msec. If timeout occurs, return + * true, else return false. + */ +bool +SharedQueueWaitOnProducerLatch(SharedQueue squeue, long timeout) +{ + SQueueSync *sqsync = squeue->sq_sync; + int rc = WaitLatch(&sqsync->sqs_producer_latch, + WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, + timeout, WAIT_EVENT_MQ_INTERNAL); + ResetLatch(&sqsync->sqs_producer_latch); + return (rc & WL_TIMEOUT); +} /* * Determine if producer can safely pause work. @@ -1260,7 +1398,6 @@ SharedQueueCanPause(SharedQueue squeue) return result; } - int SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, Tuplestorestate **tuplestore) @@ -1320,6 +1457,11 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, } /* Consumer may be sleeping, wake it up */ SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); + + /* + * XXX This can only be called by the producer. So no need + * to set producer latch. + */ } } } @@ -1330,6 +1472,10 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, { cstate->cs_status = CONSUMER_EOF; SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); + /* + * XXX This can only be called by the producer. So no need to + * set producer latch. + */ } } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); @@ -1376,6 +1522,7 @@ CHECK: int c_count = 0; int unbound_count = 0; + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_EXCLUSIVE); /* check queue states */ for (i = 0; i < squeue->sq_nconsumers; i++) { @@ -1395,8 +1542,7 @@ CHECK: cstate->cs_status = CONSUMER_ERROR; } - - if (cstate->cs_status != CONSUMER_DONE) + else if (cstate->cs_status != CONSUMER_DONE && !failed) { elog(DEBUG1, "SQueue %s, consumer not yet done, wake it up and " "wait for it to finish reading", squeue->sq_key); @@ -1412,6 +1558,9 @@ CHECK: LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } + + LWLockRelease(sqsync->sqs_producer_lwlock); + if (c_count == 0) break; elog(DEBUG1, "SQueue %s, wait while %d consumers finish, %d consumers" @@ -1420,13 +1569,19 @@ CHECK: wait_result = WaitLatch(&sqsync->sqs_producer_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, 10000L, WAIT_EVENT_MQ_INTERNAL); + + /* + * If we hit a timeout, reset the consumers which still hasn't + * connected. We already make an assumption that consumers that don't + * connect in time, would never connect and drop those consumers. + * + * XXX Unfortunately, while this is not the best way to handle the + * problem, we have not found a reliable way to tell whether a specific + * consumer will ever connect or not. So this kludge at least avoids a + * infinite hang. + */ if (wait_result & WL_TIMEOUT) - { - elog(WARNING, "SQueue %s, timeout while waiting for Consumers " - "finishing", squeue->sq_key); - break; - } - /* got notification, continue loop */ + SharedQueueResetNotConnected(squeue); } #ifdef SQUEUE_STAT elog(DEBUG1, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused); @@ -1435,6 +1590,7 @@ CHECK: squeue->sq_key, squeue->sq_nodeid, squeue->sq_pid); LWLockAcquire(SQueuesLock, LW_EXCLUSIVE); + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_EXCLUSIVE); /* * In rear situation, after consumers just bind to the shared queue, the producer timeout and remove the shared queue. @@ -1464,6 +1620,7 @@ CHECK: { elog(DEBUG1, "SQueue %s have %d consumers started running after we " "unbound, recheck now", squeue->sq_key, consumer_running); + LWLockRelease(sqsync->sqs_producer_lwlock); LWLockRelease(SQueuesLock); goto CHECK; } @@ -1480,6 +1637,7 @@ CHECK: elog(PANIC, "Shared queue data corruption"); } + LWLockRelease(sqsync->sqs_producer_lwlock); LWLockRelease(SQueuesLock); } @@ -1509,6 +1667,8 @@ SharedQueueRelease(const char *sqname) elog(DEBUG1, "SQueue %s producer node %d, pid %d - requested to release", sqname, sq->sq_nodeid, sq->sq_pid); + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_EXCLUSIVE); + /* * If the SharedQ is not bound, we can't just remove it because * somebody might have just created a fresh entry and is going to bind @@ -1518,6 +1678,7 @@ SharedQueueRelease(const char *sqname) if (sq->sq_nodeid == -1) { elog(DEBUG1, "SQueue %s, producer not bound ", sqname); + LWLockRelease(sqsync->sqs_producer_lwlock); goto done; } @@ -1584,6 +1745,7 @@ SharedQueueRelease(const char *sqname) cstate->cs_status); } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); + LWLockRelease(sqsync->sqs_producer_lwlock); /* exit */ goto done; } @@ -1612,6 +1774,7 @@ SharedQueueRelease(const char *sqname) } } } + LWLockRelease(sqsync->sqs_producer_lwlock); } done: /* @@ -1738,13 +1901,18 @@ sq_push_long_tuple(ConsState *cstate, RemoteDataRow datarow) * sq_pull_long_tuple * Read in from the queue data of a long tuple which does not the queue. * See sq_push_long_tuple for more details + * + * The function is entered with LWLocks held on the consumer as well as + * procuder sync. The function exits with both of those locks held, even + * though internally it may release those locks before going to sleep. */ static void sq_pull_long_tuple(ConsState *cstate, RemoteDataRow datarow, - ConsumerSync *sync) + int consumerIdx, SQueueSync *sqsync) { int offset = 0; int len = datarow->msglen; + ConsumerSync *sync = &sqsync->sqs_consumer_sync[consumerIdx]; for (;;) { @@ -1770,13 +1938,28 @@ sq_pull_long_tuple(ConsState *cstate, RemoteDataRow datarow, /* Release locks and wait until producer supply more data */ while (cstate->cs_ntuples == LONG_TUPLE) { - /* prepare wait */ + /* + * First up wake the producer + */ + SetLatch(&sqsync->sqs_producer_latch); + + /* + * We must reset the consumer latch while holding the lock to + * ensure the producer can't change the state in between. + */ ResetLatch(&sync->cs_latch); + + /* + * Now release all locks before going into a wait state + */ LWLockRelease(sync->cs_lwlock); + LWLockRelease(sqsync->sqs_producer_lwlock); + /* Wait for notification about available info */ WaitLatch(&sync->cs_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, WAIT_EVENT_MQ_INTERNAL); /* got the notification, restore lock and try again */ + LWLockAcquire(sqsync->sqs_producer_lwlock, LW_SHARED); LWLockAcquire(sync->cs_lwlock, LW_EXCLUSIVE); } /* Read length of remaining data */ diff --git a/src/include/pgxc/squeue.h b/src/include/pgxc/squeue.h index 5d5e7136bd..641ee81d8c 100644 --- a/src/include/pgxc/squeue.h +++ b/src/include/pgxc/squeue.h @@ -53,8 +53,10 @@ extern void SharedQueueWrite(SharedQueue squeue, int consumerIdx, MemoryContext tmpcxt); extern bool SharedQueueRead(SharedQueue squeue, int consumerIdx, TupleTableSlot *slot, bool canwait); +extern void SharedQueueDisconnectConsumer(const char *sqname); extern void SharedQueueReset(SharedQueue squeue, int consumerIdx); extern void SharedQueueResetNotConnected(SharedQueue squeue); extern bool SharedQueueCanPause(SharedQueue squeue); +extern bool SharedQueueWaitOnProducerLatch(SharedQueue squeue, long timeout); #endif |