diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/squeue/squeue.c | 44 |
1 files changed, 39 insertions, 5 deletions
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c index 09700c1af3..0d2844aa5c 100644 --- a/src/backend/pgxc/squeue/squeue.c +++ b/src/backend/pgxc/squeue/squeue.c @@ -104,6 +104,7 @@ typedef struct SQueueHeader int sq_pid; /* Process id of the producer session */ int sq_nodeid; /* Node id of the producer parent */ SQueueSync *sq_sync; /* Associated sinchronization objects */ + int sq_refcnt; /* Reference count to this entry */ #ifdef SQUEUE_STAT bool stat_finish; long stat_paused; @@ -287,6 +288,7 @@ tryagain: /* Initialize the shared queue */ sq->sq_pid = 0; sq->sq_nodeid = -1; + sq->sq_refcnt = 1; #ifdef SQUEUE_STAT sq->stat_finish = false; sq->stat_paused = 0; @@ -401,8 +403,8 @@ tryagain: " %d tries", trycount); goto tryagain; } - } + sq->sq_refcnt++; } LWLockRelease(SQueuesLock); } @@ -521,6 +523,18 @@ SharedQueueBind(const char *sqname, List *consNodes, if (myindex) *myindex = -1; + + /* + * Increment the refcnt only when producer binds. This is a bit + * asymmetrical, but the way things are currently setup, a consumer + * though calls SharedQueueBind, never calls SharedQueueUnBind. The + * unbinding is done only by the producer after it waits for all + * consumers to finish. + * + * XXX This ought to be fixed someday to simplify things in Shared + * Queue handling + */ + sq->sq_refcnt++; } else { @@ -1433,6 +1447,15 @@ CHECK: LWLockRelease(SQueuesLock); goto CHECK; } + + /* + * XXX Decrement the refcnt, but it doesn't really matter because we are + * unconditionally removing the SQueue anyways. SharedQueueRelease is + * prepared to work with already removed SQueue + * + * This ought to be fixed someday + */ + squeue->sq_refcnt--; /* All is done, clean up */ DisownLatch(&sqsync->sqs_producer_latch); @@ -1481,8 +1504,7 @@ SharedQueueRelease(const char *sqname) if (sq->sq_nodeid == -1) { elog(DEBUG1, "SQueue %s, producer not bound ", sqname); - LWLockRelease(SQueuesLock); - return; + goto done; } /* @@ -1549,8 +1571,7 @@ SharedQueueRelease(const char *sqname) } LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock); /* exit */ - LWLockRelease(SQueuesLock); - return; + goto done; } } @@ -1578,6 +1599,19 @@ SharedQueueRelease(const char *sqname) } } } +done: + /* + * If we are the last holder of the SQueue, remove it from the hash table + * to avoid any leak + */ + if (sq && --sq->sq_refcnt == 0) + { + /* Now it is OK to remove hash table entry */ + sq->sq_sync->queue = NULL; + sq->sq_sync = NULL; + if (hash_search(SharedQueues, sq->sq_key, HASH_REMOVE, NULL) != sq) + elog(PANIC, "Shared queue data corruption"); + } LWLockRelease(SQueuesLock); } |
