summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/portalcmds.c2
-rw-r--r--src/backend/executor/producerReceiver.c2
-rw-r--r--src/backend/pgxc/squeue/squeue.c13
-rw-r--r--src/backend/tcop/pquery.c2
-rw-r--r--src/include/pgxc/squeue.h2
-rw-r--r--src/test/regress/expected/xl_join.out40
-rw-r--r--src/test/regress/parallel_schedule2
-rw-r--r--src/test/regress/serial_schedule1
-rw-r--r--src/test/regress/sql/xl_join.sql20
9 files changed, 75 insertions, 9 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 448e1ddce1..8c84c812ac 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -322,7 +322,7 @@ PortalCleanup(Portal portal)
* consumers.
*/
if (queryDesc->squeue)
- SharedQueueUnBind(queryDesc->squeue);
+ SharedQueueUnBind(queryDesc->squeue, true);
FreeQueryDesc(queryDesc);
}
}
diff --git a/src/backend/executor/producerReceiver.c b/src/backend/executor/producerReceiver.c
index b2d22e07d9..145788a11b 100644
--- a/src/backend/executor/producerReceiver.c
+++ b/src/backend/executor/producerReceiver.c
@@ -174,7 +174,7 @@ producerDestroyReceiver(DestReceiver *self)
/* wait while consumer are finishing and release shared resources */
if (myState->squeue)
- SharedQueueUnBind(myState->squeue);
+ SharedQueueUnBind(myState->squeue, false);
myState->squeue = NULL;
/* Release workspace if any */
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c
index 3d9b9100cf..56c91782e5 100644
--- a/src/backend/pgxc/squeue/squeue.c
+++ b/src/backend/pgxc/squeue/squeue.c
@@ -1200,7 +1200,7 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc,
* set the tuplestore parameter to NULL.
*/
void
-SharedQueueUnBind(SharedQueue squeue)
+SharedQueueUnBind(SharedQueue squeue, bool failed)
{
SQueueSync *sqsync = squeue->sq_sync;
int wait_result = 0;
@@ -1216,6 +1216,7 @@ CHECK:
{
int i;
int c_count = 0;
+ int unbound_count = 0;
/* check queue states */
for (i = 0; i < squeue->sq_nconsumers; i++)
@@ -1223,7 +1224,7 @@ CHECK:
ConsState *cstate = &squeue->sq_consumers[i];
LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE);
/* is consumer working yet ? */
- if (cstate->cs_status == CONSUMER_ACTIVE)
+ if (cstate->cs_status == CONSUMER_ACTIVE && failed)
cstate->cs_status = CONSUMER_ERROR;
if (cstate->cs_status != CONSUMER_DONE)
{
@@ -1232,13 +1233,17 @@ CHECK:
SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch);
/* producer will continue waiting */
ResetLatch(&sqsync->sqs_producer_latch);
+
+ if (cstate->cs_pid == 0)
+ unbound_count++;
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
if (c_count == 0)
break;
- elog(DEBUG1, "Wait while %d squeue readers finishing", c_count);
+ elog(DEBUG1, "Wait while %d squeue readers finish, %d squeue readers "
+ "not yet bound", c_count, unbound_count);
/* wait for a notification */
wait_result = WaitLatch(&sqsync->sqs_producer_latch,
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
@@ -1294,7 +1299,7 @@ CHECK:
LWLockRelease(SQueuesLock);
elog(DEBUG1, "Finalized squeue");
if (wait_result & WL_TIMEOUT)
- elog(FATAL, "Timeout while waiting for Consumers finishing");
+ elog(WARNING, "Timeout while waiting for Consumers finishing");
}
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 454f427217..6b749612e2 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -714,7 +714,7 @@ PortalStart(Portal portal, ParamListInfo params,
PG_CATCH();
{
/* Ensure SharedQueue is released */
- SharedQueueUnBind(queryDesc->squeue);
+ SharedQueueUnBind(queryDesc->squeue, true);
queryDesc->squeue = NULL;
PG_RE_THROW();
}
diff --git a/src/include/pgxc/squeue.h b/src/include/pgxc/squeue.h
index ad1873e0a0..c0a9807e43 100644
--- a/src/include/pgxc/squeue.h
+++ b/src/include/pgxc/squeue.h
@@ -41,7 +41,7 @@ extern void SharedQueuesInit(void);
extern void SharedQueueAcquire(const char *sqname, int ncons);
extern SharedQueue SharedQueueBind(const char *sqname, List *consNodes,
List *distNodes, int *myindex, int *consMap);
-extern void SharedQueueUnBind(SharedQueue squeue);
+extern void SharedQueueUnBind(SharedQueue squeue, bool failed);
extern void SharedQueueRelease(const char *sqname);
extern void SharedQueuesCleanup(int code, Datum arg);
diff --git a/src/test/regress/expected/xl_join.out b/src/test/regress/expected/xl_join.out
new file mode 100644
index 0000000000..7c705769b1
--- /dev/null
+++ b/src/test/regress/expected/xl_join.out
@@ -0,0 +1,40 @@
+CREATE TABLE xl_join_t1 (val1 int, val2 int);
+CREATE TABLE xl_join_t2 (val1 int, val2 int);
+CREATE TABLE xl_join_t3 (val1 int, val2 int);
+INSERT INTO xl_join_t1 VALUES (1,10),(2,20);
+INSERT INTO xl_join_t2 VALUES (3,30),(4,40);
+INSERT INTO xl_join_t3 VALUES (5,50),(6,60);
+EXPLAIN SELECT * FROM xl_join_t1
+ INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2
+ INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------------
+ Remote Subquery Scan on all (datanode_1,datanode_2) (cost=475.52..5209.87 rows=288579 width=24)
+ -> Merge Join (cost=475.52..5209.87 rows=288579 width=24)
+ Merge Cond: (xl_join_t3.val1 = xl_join_t1.val1)
+ -> Sort (cost=158.51..164.16 rows=2260 width=8)
+ Sort Key: xl_join_t3.val1
+ -> Seq Scan on xl_join_t3 (cost=0.00..32.60 rows=2260 width=8)
+ -> Materialize (cost=317.01..775.23 rows=25538 width=16)
+ -> Merge Join (cost=317.01..711.38 rows=25538 width=16)
+ Merge Cond: (xl_join_t2.val2 = xl_join_t1.val1)
+ -> Remote Subquery Scan on all (datanode_1,datanode_2) (cost=100.00..161.98 rows=2260 width=8)
+ Distribute results by H: val2
+ -> Sort (cost=287.89..293.54 rows=2260 width=8)
+ Sort Key: xl_join_t2.val2
+ -> Seq Scan on xl_join_t2 (cost=0.00..32.60 rows=2260 width=8)
+ -> Sort (cost=158.51..164.16 rows=2260 width=8)
+ Sort Key: xl_join_t1.val1
+ -> Seq Scan on xl_join_t1 (cost=0.00..32.60 rows=2260 width=8)
+(17 rows)
+
+SELECT * FROM xl_join_t1
+ INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2
+ INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1;
+ val1 | val2 | val1 | val2 | val1 | val2
+------+------+------+------+------+------
+(0 rows)
+
+DROP TABLE xl_join_t1;
+DROP TABLE xl_join_t2;
+DROP TABLE xl_join_t3;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 5169c3887d..2bad0ff1a5 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -137,7 +137,7 @@ test: xc_prepared_xacts
test: xc_notrans_block
# This runs XL specific tests
-test: xl_primary_key xl_foreign_key xl_distribution_column_types xl_alter_table xl_distribution_column_types_modulo xl_plan_pushdown xl_functions xl_limitations xl_user_defined_functions
+test: xl_primary_key xl_foreign_key xl_distribution_column_types xl_alter_table xl_distribution_column_types_modulo xl_plan_pushdown xl_functions xl_limitations xl_user_defined_functions xl_join
#known bugs
test: xl_known_bugs
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 59f4012a29..f404c36b00 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -185,3 +185,4 @@ test: xl_plan_pushdown
test: xl_functions
test: xl_limitations
test: xl_user_defined_functions
+test: xl_join
diff --git a/src/test/regress/sql/xl_join.sql b/src/test/regress/sql/xl_join.sql
new file mode 100644
index 0000000000..ed2829daff
--- /dev/null
+++ b/src/test/regress/sql/xl_join.sql
@@ -0,0 +1,20 @@
+
+CREATE TABLE xl_join_t1 (val1 int, val2 int);
+CREATE TABLE xl_join_t2 (val1 int, val2 int);
+CREATE TABLE xl_join_t3 (val1 int, val2 int);
+
+INSERT INTO xl_join_t1 VALUES (1,10),(2,20);
+INSERT INTO xl_join_t2 VALUES (3,30),(4,40);
+INSERT INTO xl_join_t3 VALUES (5,50),(6,60);
+
+EXPLAIN SELECT * FROM xl_join_t1
+ INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2
+ INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1;
+
+SELECT * FROM xl_join_t1
+ INNER JOIN xl_join_t2 ON xl_join_t1.val1 = xl_join_t2.val2
+ INNER JOIN xl_join_t3 ON xl_join_t1.val1 = xl_join_t3.val1;
+
+DROP TABLE xl_join_t1;
+DROP TABLE xl_join_t2;
+DROP TABLE xl_join_t3;