diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/commands/portalcmds.c | 2 | ||||
| -rw-r--r-- | src/backend/executor/producerReceiver.c | 2 | ||||
| -rw-r--r-- | src/backend/pgxc/squeue/squeue.c | 13 | ||||
| -rw-r--r-- | src/backend/tcop/pquery.c | 2 | ||||
| -rw-r--r-- | src/include/pgxc/squeue.h | 2 | ||||
| -rw-r--r-- | src/test/regress/expected/xl_join.out | 40 | ||||
| -rw-r--r-- | src/test/regress/parallel_schedule | 2 | ||||
| -rw-r--r-- | src/test/regress/serial_schedule | 1 | ||||
| -rw-r--r-- | src/test/regress/sql/xl_join.sql | 20 |
9 files changed, 75 insertions, 9 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 448e1ddce1..8c84c812ac 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -322,7 +322,7 @@ PortalCleanup(Portal portal) * consumers. */ if (queryDesc->squeue) - SharedQueueUnBind(queryDesc->squeue); + SharedQueueUnBind(queryDesc->squeue, true); FreeQueryDesc(queryDesc); } } diff --git a/src/backend/executor/producerReceiver.c b/src/backend/executor/producerReceiver.c index b2d22e07d9..145788a11b 100644 --- a/src/backend/executor/producerReceiver.c +++ b/src/backend/executor/producerReceiver.c @@ -174,7 +174,7 @@ producerDestroyReceiver(DestReceiver *self) /* wait while consumer are finishing and release shared resources */ if (myState->squeue) - SharedQueueUnBind(myState->squeue); + SharedQueueUnBind(myState->squeue, false); myState->squeue = NULL; /* Release workspace if any */ diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index 3d9b9100cf..56c91782e5 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -1200,7 +1200,7 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc, * set the tuplestore parameter to NULL. */ void -SharedQueueUnBind(SharedQueue squeue) +SharedQueueUnBind(SharedQueue squeue, bool failed) { SQueueSync *sqsync = squeue->sq_sync; int wait_result = 0; @@ -1216,6 +1216,7 @@ CHECK: { int i; int c_count = 0; + int unbound_count = 0; /* check queue states */ for (i = 0; i < squeue->sq_nconsumers; i++) @@ -1223,7 +1224,7 @@ CHECK: ConsState *cstate = &squeue->sq_consumers[i]; LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE); /* is consumer working yet ? */ - if (cstate->cs_status == CONSUMER_ACTIVE) + if (cstate->cs_status == CONSUMER_ACTIVE && failed) cstate->cs_status = CONSUMER_ERROR; if (cstate->cs_status != CONSUMER_DONE) { @@ -1232,13 +1233,17 @@ CHECK: SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch); /* producer will continue waiting */ ResetLatch(&sqsync->sqs_producer_latch); + + if (cstate->cs_pid == 0) + unbound_count++; } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); } if (c_count == 0) break; - elog(DEBUG1, "Wait while %d squeue readers finishing", c_count); + elog(DEBUG1, "Wait while %d squeue readers finish, %d squeue readers " + "not yet bound", c_count, unbound_count); /* wait for a notification */ wait_result = WaitLatch(&sqsync->sqs_producer_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT, @@ -1294,7 +1299,7 @@ CHECK: LWLockRelease(SQueuesLock); elog(DEBUG1, "Finalized squeue"); if (wait_result & WL_TIMEOUT) - elog(FATAL, "Timeout while waiting for Consumers finishing"); + elog(WARNING, "Timeout while waiting for Consumers finishing"); } diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 454f427217..6b749612e2 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -714,7 +714,7 @@ PortalStart(Portal portal, ParamListInfo params, PG_CATCH(); { /* Ensure SharedQueue is released */ - SharedQueueUnBind(queryDesc->squeue); + SharedQueueUnBind(queryDesc->squeue, true); queryDesc->squeue = NULL; PG_RE_THROW(); } diff --git a/src/include/pgxc/squeue.h b/src/include/pgxc/squeue.h index ad1873e0a0..c0a9807e43 100644 --- a/src/include/pgxc/squeue.h +++ b/src/include/pgxc/squeue.h @@ -41,7 +41,7 @@ extern void SharedQueuesInit(void); extern void SharedQueueAcquire(const char *sqname, int ncons); extern SharedQueue SharedQueueBind(const char *sqname, List *consNodes, List *distNodes, int *myindex, int *consMap); -extern void SharedQueueUnBind(SharedQueue squeue); +extern void SharedQueueUnBind(SharedQueue squeue, bool failed); extern void SharedQueueRelease(const char *sqname); extern void SharedQueuesCleanup(int code, Datum arg); diff --git a/src/test/regress/expected/xl_join.out b/src/test/regress/expected/xl_join.out new file mode 100644 index 0000000000..7c705769b1 --- /dev/null +++ b/src/test/regress/expected/xl_join.out @@ -0,0 +1,40 @@ +CREATE TABLE xl_join_t1 (val1 int, val2 int); +CREATE TABLE xl_join_t2 (val1 int, val2 int); +CREATE TABLE xl_join_t3 (val1 int, val2 int); +INSERT INTO xl_join_t1 VALUES (1,10),(2,20); +INSERT INTO xl_join_t2 VALUES (3,30),(4,40); +INSERT INTO xl_join_t3 VALUES (5,50),(6,60); +EXPLAIN SELECT * FROM xl_join_t1 + INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2 + INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------- + Remote Subquery Scan on all (datanode_1,datanode_2) (cost=475.52..5209.87 rows=288579 width=24) + -> Merge Join (cost=475.52..5209.87 rows=288579 width=24) + Merge Cond: (xl_join_t3.val1 = xl_join_t1.val1) + -> Sort (cost=158.51..164.16 rows=2260 width=8) + Sort Key: xl_join_t3.val1 + -> Seq Scan on xl_join_t3 (cost=0.00..32.60 rows=2260 width=8) + -> Materialize (cost=317.01..775.23 rows=25538 width=16) + -> Merge Join (cost=317.01..711.38 rows=25538 width=16) + Merge Cond: (xl_join_t2.val2 = xl_join_t1.val1) + -> Remote Subquery Scan on all (datanode_1,datanode_2) (cost=100.00..161.98 rows=2260 width=8) + Distribute results by H: val2 + -> Sort (cost=287.89..293.54 rows=2260 width=8) + Sort Key: xl_join_t2.val2 + -> Seq Scan on xl_join_t2 (cost=0.00..32.60 rows=2260 width=8) + -> Sort (cost=158.51..164.16 rows=2260 width=8) + Sort Key: xl_join_t1.val1 + -> Seq Scan on xl_join_t1 (cost=0.00..32.60 rows=2260 width=8) +(17 rows) + +SELECT * FROM xl_join_t1 + INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2 + INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1; + val1 | val2 | val1 | val2 | val1 | val2 +------+------+------+------+------+------ +(0 rows) + +DROP TABLE xl_join_t1; +DROP TABLE xl_join_t2; +DROP TABLE xl_join_t3; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 5169c3887d..2bad0ff1a5 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -137,7 +137,7 @@ test: xc_prepared_xacts test: xc_notrans_block # This runs XL specific tests -test: xl_primary_key xl_foreign_key xl_distribution_column_types xl_alter_table xl_distribution_column_types_modulo xl_plan_pushdown xl_functions xl_limitations xl_user_defined_functions +test: xl_primary_key xl_foreign_key xl_distribution_column_types xl_alter_table xl_distribution_column_types_modulo xl_plan_pushdown xl_functions xl_limitations xl_user_defined_functions xl_join #known bugs test: xl_known_bugs diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 59f4012a29..f404c36b00 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -185,3 +185,4 @@ test: xl_plan_pushdown test: xl_functions test: xl_limitations test: xl_user_defined_functions +test: xl_join diff --git a/src/test/regress/sql/xl_join.sql b/src/test/regress/sql/xl_join.sql new file mode 100644 index 0000000000..ed2829daff --- /dev/null +++ b/src/test/regress/sql/xl_join.sql @@ -0,0 +1,20 @@ + +CREATE TABLE xl_join_t1 (val1 int, val2 int); +CREATE TABLE xl_join_t2 (val1 int, val2 int); +CREATE TABLE xl_join_t3 (val1 int, val2 int); + +INSERT INTO xl_join_t1 VALUES (1,10),(2,20); +INSERT INTO xl_join_t2 VALUES (3,30),(4,40); +INSERT INTO xl_join_t3 VALUES (5,50),(6,60); + +EXPLAIN SELECT * FROM xl_join_t1 + INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2 + INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1; + +SELECT * FROM xl_join_t1 + INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2 + INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1; + +DROP TABLE xl_join_t1; +DROP TABLE xl_join_t2; +DROP TABLE xl_join_t3; |
