summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/pgxc/squeue/squeue.c44
1 files changed, 39 insertions, 5 deletions
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c
index 09700c1af3..0d2844aa5c 100644
--- a/src/backend/pgxc/squeue/squeue.c
+++ b/src/backend/pgxc/squeue/squeue.c
@@ -104,6 +104,7 @@ typedef struct SQueueHeader
int sq_pid; /* Process id of the producer session */
int sq_nodeid; /* Node id of the producer parent */
SQueueSync *sq_sync; /* Associated sinchronization objects */
+ int sq_refcnt; /* Reference count to this entry */
#ifdef SQUEUE_STAT
bool stat_finish;
long stat_paused;
@@ -287,6 +288,7 @@ tryagain:
/* Initialize the shared queue */
sq->sq_pid = 0;
sq->sq_nodeid = -1;
+ sq->sq_refcnt = 1;
#ifdef SQUEUE_STAT
sq->stat_finish = false;
sq->stat_paused = 0;
@@ -401,8 +403,8 @@ tryagain:
" %d tries", trycount);
goto tryagain;
}
-
}
+ sq->sq_refcnt++;
}
LWLockRelease(SQueuesLock);
}
@@ -521,6 +523,18 @@ SharedQueueBind(const char *sqname, List *consNodes,
if (myindex)
*myindex = -1;
+
+ /*
+ * Increment the refcnt only when producer binds. This is a bit
+ * asymmetrical, but the way things are currently setup, a consumer
+ * though calls SharedQueueBind, never calls SharedQueueUnBind. The
+ * unbinding is done only by the producer after it waits for all
+ * consumers to finish.
+ *
+ * XXX This ought to be fixed someday to simplify things in Shared
+ * Queue handling
+ */
+ sq->sq_refcnt++;
}
else
{
@@ -1433,6 +1447,15 @@ CHECK:
LWLockRelease(SQueuesLock);
goto CHECK;
}
+
+ /*
+ * XXX Decrement the refcnt, but it doesn't really matter because we are
+ * unconditionally removing the SQueue anyways. SharedQueueRelease is
+ * prepared to work with already removed SQueue
+ *
+ * This ought to be fixed someday
+ */
+ squeue->sq_refcnt--;
/* All is done, clean up */
DisownLatch(&sqsync->sqs_producer_latch);
@@ -1481,8 +1504,7 @@ SharedQueueRelease(const char *sqname)
if (sq->sq_nodeid == -1)
{
elog(DEBUG1, "SQueue %s, producer not bound ", sqname);
- LWLockRelease(SQueuesLock);
- return;
+ goto done;
}
/*
@@ -1549,8 +1571,7 @@ SharedQueueRelease(const char *sqname)
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
/* exit */
- LWLockRelease(SQueuesLock);
- return;
+ goto done;
}
}
@@ -1578,6 +1599,19 @@ SharedQueueRelease(const char *sqname)
}
}
}
+done:
+ /*
+ * If we are the last holder of the SQueue, remove it from the hash table
+ * to avoid any leak
+ */
+ if (sq && --sq->sq_refcnt == 0)
+ {
+ /* Now it is OK to remove hash table entry */
+ sq->sq_sync->queue = NULL;
+ sq->sq_sync = NULL;
+ if (hash_search(SharedQueues, sq->sq_key, HASH_REMOVE, NULL) != sq)
+ elog(PANIC, "Shared queue data corruption");
+ }
LWLockRelease(SQueuesLock);
}