From b302e17d1e5725441c5ca44b97ec127255bd580c Mon Sep 17 00:00:00 2001 From: Pavan Deolasee Date: Sun, 31 Jan 2016 13:55:01 +0530 Subject: Do not throw a FATAL error when SharedQ producer times out while waiting for one or more consumers to finish. We have seen bunch of cases where a consumer may never bind to the SharedQ and rightfully so. For example, in a join between 3 tables which requires redistribution of tuples, a consumer may not at all bind to the SharedQ because it the top level outer side did not produce any tuples to join against the redistributed inner node. This patch avoids the unnecessary FATAL errors, but what we still do not do nicely is to avoid the 10s timeout (as currently set for producer). So while queries, as included in the test case, will finally return success, it will unnecessarily add a 10s delay in the response time. This is a TODO. --- src/backend/pgxc/squeue/squeue.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'src/backend/pgxc') diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index 3d9b9100cf..56c91782e5 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -1200,7 +1200,7 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, * set the tuplestore parameter to NULL. */ void -SharedQueueUnBind(SharedQueue squeue) +SharedQueueUnBind(SharedQueue squeue, bool failed) { SQueueSync *sqsync = squeue->sq_sync; int wait_result = 0; @@ -1216,6 +1216,7 @@ CHECK: { int i; int c_count = 0; + int unbound_count = 0; /* check queue states */ for (i = 0; i < squeue->sq_nconsumers; i++) @@ -1223,7 +1224,7 @@ CHECK: ConsState *cstate = &squeue->sq_consumers[i]; LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE); /* is consumer working yet ? */ - if (cstate->cs_status == CONSUMER_ACTIVE) + if (cstate->cs_status == CONSUMER_ACTIVE && failed) cstate->cs_status = CONSUMER_ERROR; if (cstate->cs_status != CONSUMER_DONE) { @@ -1232,13 +1233,17 @@ CHECK: SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); /* producer will continue waiting */ ResetLatch(&sqsync->sqs_producer_latch); + + if (cstate->cs_pid == 0) + unbound_count++; } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } if (c_count == 0) break; - elog(DEBUG1, "Wait while %d squeue readers finishing", c_count); + elog(DEBUG1, "Wait while %d squeue readers finish, %d squeue readers " + "not yet bound", c_count, unbound_count); /* wait for a notification */ wait_result = WaitLatch(&sqsync->sqs_producer_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, @@ -1294,7 +1299,7 @@ CHECK: LWLockRelease(SQueuesLock); elog(DEBUG1, "Finalized squeue"); if (wait_result & WL_TIMEOUT) - elog(FATAL, "Timeout while waiting for Consumers finishing"); + elog(WARNING, "Timeout while waiting for Consumers finishing"); } -- cgit v1.2.3