diff options
| author | Pavan Deolasee | 2016-01-31 08:25:01 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2016-10-18 09:48:13 +0000 |
| commit | b302e17d1e5725441c5ca44b97ec127255bd580c (patch) | |
| tree | 4ff17cdceebe13f2954c7db5610341e70a243163 /src/backend/pgxc | |
| parent | d191fddc5b23e05d2ccf5bff7b36ccc838516470 (diff) | |
Do not throw a FATAL error when SharedQ producer times out while waiting for
one or more consumers to finish.
We have seen bunch of cases where a consumer may never bind to the SharedQ and
rightfully so. For example, in a join between 3 tables which requires
redistribution of tuples, a consumer may not at all bind to the SharedQ because
it the top level outer side did not produce any tuples to join against the
redistributed inner node.
This patch avoids the unnecessary FATAL errors, but what we still do not do
nicely is to avoid the 10s timeout (as currently set for producer). So while
queries, as included in the test case, will finally return success, it will
unnecessarily add a 10s delay in the response time. This is a TODO.
Diffstat (limited to 'src/backend/pgxc')
| -rw-r--r-- | src/backend/pgxc/squeue/squeue.c | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index 3d9b9100cf..56c91782e5 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -1200,7 +1200,7 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, * set the tuplestore parameter to NULL. */ void -SharedQueueUnBind(SharedQueue squeue) +SharedQueueUnBind(SharedQueue squeue, bool failed) { SQueueSync *sqsync = squeue->sq_sync; int wait_result = 0; @@ -1216,6 +1216,7 @@ CHECK: { int i; int c_count = 0; + int unbound_count = 0; /* check queue states */ for (i = 0; i < squeue->sq_nconsumers; i++) @@ -1223,7 +1224,7 @@ CHECK: ConsState *cstate = &squeue->sq_consumers[i]; LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE); /* is consumer working yet ? */ - if (cstate->cs_status == CONSUMER_ACTIVE) + if (cstate->cs_status == CONSUMER_ACTIVE && failed) cstate->cs_status = CONSUMER_ERROR; if (cstate->cs_status != CONSUMER_DONE) { @@ -1232,13 +1233,17 @@ CHECK: SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); /* producer will continue waiting */ ResetLatch(&sqsync->sqs_producer_latch); + + if (cstate->cs_pid == 0) + unbound_count++; } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } if (c_count == 0) break; - elog(DEBUG1, "Wait while %d squeue readers finishing", c_count); + elog(DEBUG1, "Wait while %d squeue readers finish, %d squeue readers " + "not yet bound", c_count, unbound_count); /* wait for a notification */ wait_result = WaitLatch(&sqsync->sqs_producer_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, @@ -1294,7 +1299,7 @@ CHECK: LWLockRelease(SQueuesLock); elog(DEBUG1, "Finalized squeue"); if (wait_result & WL_TIMEOUT) - elog(FATAL, "Timeout while waiting for Consumers finishing"); + elog(WARNING, "Timeout while waiting for Consumers finishing"); } |
