summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavan Deolasee2016-02-10 04:48:38 +0000
committerPavan Deolasee2016-10-18 09:57:45 +0000
commit50ede31b3c6068c89274cc04d1c2984114d617d5 (patch)
treedcb1360c1e454013636c352575b050aff57f333c
parent7ed1ba850b7e1d12521e3e8113ff659c5dc4937f (diff)
Sprinkle SQueue and portal management code with DEBUG messages
-rw-r--r--src/backend/commands/portalcmds.c3
-rw-r--r--src/backend/pgxc/squeue/squeue.c207
-rw-r--r--src/backend/tcop/postgres.c4
-rw-r--r--src/backend/utils/mmgr/portalmem.c20
4 files changed, 207 insertions, 27 deletions
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 8c84c812ac..f0799419db 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -239,6 +239,9 @@ PerformPortalClose(const char *name)
return; /* keep compiler happy */
}
+#ifdef XCP
+ elog(DEBUG3, "PerformPortalClose for portal %s", name);
+#endif
/*
* Note: PortalCleanup is called as a side-effect, if not already done.
*/
diff --git a/src/backend/pgxc/squeue/squeue.c b/src/backend/pgxc/squeue/squeue.c
index 56c91782e5..8c97557294 100644
--- a/src/backend/pgxc/squeue/squeue.c
+++ b/src/backend/pgxc/squeue/squeue.c
@@ -284,7 +284,7 @@ tryagain:
int i;
char *heapPtr;
- elog(DEBUG1, "Format squeue %s for %d consumers", sqname, ncons);
+ elog(DEBUG1, "Create a new SQueue %s and format it for %d consumers", sqname, ncons);
/* Initialize the shared queue */
sq->sq_pid = 0;
@@ -337,6 +337,23 @@ tryagain:
}
else
{
+ int i;
+
+ elog(DEBUG1, "Found an existing SQueue %s - (sq_pid:%d, sq_nodeid:%d,"
+ " sq_nconsumers:%d",
+ sqname, sq->sq_pid, sq->sq_nodeid, sq->sq_nconsumers);
+
+ for (i = 0; i < sq->sq_nconsumers; i++)
+ {
+ elog(DEBUG1, "SQueue %s, consumer (%d) information (cs_pid:%d,"
+ " cs_node:%d, cs_ntuples:%d, cs_status: %d",
+ sqname, i,
+ sq->sq_consumers[i].cs_pid,
+ sq->sq_consumers[i].cs_node,
+ sq->sq_consumers[i].cs_ntuples,
+ sq->sq_consumers[i].cs_status);
+ }
+
/*
* A race condition is possible here. The previous operation might use
* the same Shared Queue name if that was different execution of the
@@ -378,6 +395,8 @@ tryagain:
{
LWLockRelease(SQueuesLock);
pg_usleep(1L);
+ elog(DEBUG1, "SQueue race condition, give the old producer to "
+ "finish the work and retry again");
goto tryagain;
}
@@ -447,6 +466,8 @@ SharedQueueBind(const char *sqname, List *consNodes,
{
/* Producer must be in the consNodes list */
Assert(list_member_int(consNodes, nodeid));
+ elog(DEBUG1, "SQueue %s consumer @%d is set to self",
+ sqname, i);
consMap[i++] = SQ_CONS_SELF;
}
/*
@@ -464,14 +485,18 @@ SharedQueueBind(const char *sqname, List *consNodes,
if (cstate->cs_node == nodeid)
{
/* The process already reported that queue won't read */
- elog(DEBUG1, "Node %d of step %s is released already",
- nodeid, sqname);
+ elog(DEBUG1, "Node %d of SQueue %s is released already "
+ "at consumer %d, cs_status %d",
+ nodeid, sqname, j, cstate->cs_status);
consMap[i++] = SQ_CONS_NONE;
break;
}
else if (cstate->cs_node == -1)
{
/* found unused slot, assign the consumer to it */
+ elog(DEBUG1, "Node %d of SQueue %s is bound at consumer "
+ "%d, cs_status %d",
+ nodeid, sqname, j, cstate->cs_status);
consMap[i++] = j;
cstate->cs_node = nodeid;
break;
@@ -485,6 +510,9 @@ SharedQueueBind(const char *sqname, List *consNodes,
*/
else
{
+ elog(DEBUG1, "Node %d of SQueue %s is not in the "
+ "redistribution list and hence would never connect",
+ nodeid, sqname);
consMap[i++] = SQ_CONS_NONE;
}
}
@@ -500,7 +528,9 @@ SharedQueueBind(const char *sqname, List *consNodes,
/* Producer should be different process */
Assert(sq->sq_pid != MyProcPid);
- elog(DEBUG1, "Bind node %s to squeue of step %s as a consumer of process %d", PGXC_PARENT_NODE, sqname, sq->sq_pid);
+ elog(DEBUG1, "SQueue %s has a bound producer from node %d, pid %d",
+ sqname, sq->sq_nodeid, sq->sq_pid);
+ elog(DEBUG1, "Bind node %s to SQueue %s as a consumer %d", PGXC_PARENT_NODE, sqname, sq->sq_pid);
/* Sanity checks */
Assert(myindex);
@@ -538,6 +568,8 @@ SharedQueueBind(const char *sqname, List *consNodes,
*/
SQueueSync *sqsync = sq->sq_sync;
+ elog(DEBUG1, "SQueue %s, consumer node %d is same as "
+ "the parent node", sqname, nodeid);
LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock,
LW_EXCLUSIVE);
/* Make sure no consumer bound to the queue already */
@@ -568,13 +600,23 @@ SharedQueueBind(const char *sqname, List *consNodes,
* ACTIVE. If producer have had only few rows to emit
* and it is already done the status would be EOF.
*/
+
/* Set up the consumer */
cstate->cs_pid = MyProcPid;
+
+ elog(DEBUG1, "SQueue %s, consumer at %d, status %d - "
+ "setting up consumer node %d, pid %d",
+ sqname, i, cstate->cs_status, cstate->cs_node,
+ cstate->cs_pid);
/* return found index */
*myindex = i;
OwnLatch(&sqsync->sqs_consumer_sync[i].cs_latch);
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
+ else
+ elog(DEBUG1, "SQueue %s, consumer node %d is not same as "
+ "the parent node %d", sqname, nodeid,
+ PGXC_PARENT_NODE_ID);
break;
}
}
@@ -601,9 +643,18 @@ SharedQueueDump(SharedQueue squeue, int consumerIdx,
{
ConsState *cstate = &(squeue->sq_consumers[consumerIdx]);
+ elog(DEBUG3, "Dumping SQueue %s data for consumer at %d, "
+ "producer - node %d, pid %d, "
+ "consumer - node %d, pid %d, status %d",
+ squeue->sq_key, consumerIdx,
+ squeue->sq_nodeid, squeue->sq_pid,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
+
/* discard stored data if consumer is not active */
if (cstate->cs_status != CONSUMER_ACTIVE)
{
+ elog(DEBUG3, "Discarding SQueue %s data for consumer at %d not active",
+ squeue->sq_key, consumerIdx);
tuplestore_clear(tuplestore);
return true;
}
@@ -631,6 +682,8 @@ SharedQueueDump(SharedQueue squeue, int consumerIdx,
if (!tuplestore_gettupleslot(tuplestore, true, false, tmpslot))
{
/* false means the tuplestore in EOF state */
+ elog(DEBUG3, "Tuplestore for SQueue %s returned EOF",
+ squeue->sq_key);
break;
}
#ifdef SQUEUE_STAT
@@ -820,6 +873,8 @@ SharedQueueWrite(SharedQueue squeue, int consumerIdx,
/* do not supply data to closed consumer */
if (cstate->cs_status == CONSUMER_ACTIVE)
{
+ elog(DEBUG3, "SQueue %s, consumer is active, writing data",
+ squeue->sq_key);
/* write out the data */
QUEUE_WRITE(cstate, sizeof(int), (char *) &datarow->msglen);
QUEUE_WRITE(cstate, datarow->msglen, datarow->msg);
@@ -828,6 +883,9 @@ SharedQueueWrite(SharedQueue squeue, int consumerIdx,
if ((cstate->cs_ntuples)++ == 0)
SetLatch(&sqsync->sqs_consumer_sync[consumerIdx].cs_latch);
}
+ else
+ elog(DEBUG2, "SQueue %s, consumer is not active, no need to supply data",
+ squeue->sq_key);
/* clean up */
if (free_datarow)
@@ -861,8 +919,17 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx,
Assert(cstate->cs_status != CONSUMER_DONE);
while (cstate->cs_ntuples <= 0)
{
+ elog(DEBUG3, "SQueue %s, consumer node %d, pid %d, status %d - "
+ "no tuples in the queue", squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
+
if (cstate->cs_status == CONSUMER_EOF)
{
+ elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, status %d - "
+ "EOF marked. Informing produer by setting CONSUMER_DONE",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
+
/* Inform producer the consumer have done the job */
cstate->cs_status = CONSUMER_DONE;
/* no need to receive notifications */
@@ -875,11 +942,14 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx,
* are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(DEBUG1, "EOF reached while reading from squeue, exiting");
return true;
}
else if (cstate->cs_status == CONSUMER_ERROR)
{
+ elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, status %d - "
+ "CONSUMER_ERROR set",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
/*
* There was a producer error while waiting.
* Release all the locks and report problem to the caller.
@@ -893,14 +963,24 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx,
*/
ereport(ERROR,
(errcode(ERRCODE_PRODUCER_ERROR),
- errmsg("Failed to read from shared queue - producer failed and set status to %d",
- cstate->cs_status)));
+ errmsg("Failed to read from SQueue %s, "
+ "consumer (node %d, pid %d, status %d) - "
+ "CONSUMER_ERROR set",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status)));
}
if (canwait)
{
/* Prepare waiting on empty buffer */
ResetLatch(&sqsync->sqs_consumer_sync[consumerIdx].cs_latch);
LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock);
+
+ elog(DEBUG3, "SQueue %s, consumer (node %d, pid %d, status %d) - "
+ "no queued tuples to read, waiting "
+ "for producer to produce more data",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
+
/* Wait for notification about available info */
WaitLatch(&sqsync->sqs_consumer_sync[consumerIdx].cs_latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
/* got the notification, restore lock and try again */
@@ -909,10 +989,22 @@ SharedQueueRead(SharedQueue squeue, int consumerIdx,
else
{
LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock);
+
+ elog(DEBUG3, "SQueue %s, consumer (node %d, pid %d, status %d) - "
+ "no queued tuples to read, caller can't wait ",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
ExecClearTuple(slot);
return false;
}
}
+
+ elog(DEBUG3, "SQueue %s, consumer (node %d, pid %d, status %d) - "
+ "%d queued tuples to read",
+ squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status,
+ cstate->cs_ntuples);
+
/* have at least one row, read it in and store to slot */
QUEUE_READ(cstate, sizeof(int), (char *) (&datalen));
datarow = (RemoteDataRow) palloc(sizeof(RemoteDataRowData) + datalen);
@@ -959,6 +1051,10 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx)
{
int i;
+ elog(DEBUG1, "SQueue %s, requested to reset producer node %d, pid %d - "
+ "Now also resetting all consumers",
+ squeue->sq_key, squeue->sq_nodeid, squeue->sq_pid);
+
/* check queue states */
for (i = 0; i < squeue->sq_nconsumers; i++)
{
@@ -975,7 +1071,11 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx)
if (cstate->cs_status != CONSUMER_EOF &&
cstate->cs_status != CONSUMER_DONE)
{
- elog(DEBUG1, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
+ elog(DEBUG1, "SQueue %s, reset consumer at %d, "
+ "consumer node %d, pid %d, status %d - marking CONSUMER_ERROR",
+ squeue->sq_key, i, cstate->cs_node, cstate->cs_pid,
+ cstate->cs_status);
+
cstate->cs_status = CONSUMER_ERROR;
/* discard tuples which may already be in the queue */
cstate->cs_ntuples = 0;
@@ -987,16 +1087,26 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx)
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
- elog(DEBUG1, "Reset producer %s", squeue->sq_key);
}
else
{
ConsState *cstate = &(squeue->sq_consumers[consumerIdx]);
+
+ elog(DEBUG1, "SQueue %s, requested to reset consumer at %d, "
+ "consumer node %d, pid %d, status %d",
+ squeue->sq_key, consumerIdx, cstate->cs_node, cstate->cs_pid,
+ cstate->cs_status);
+
LWLockAcquire(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock,
LW_EXCLUSIVE);
if (cstate->cs_status != CONSUMER_DONE)
{
+ elog(DEBUG1, "SQueue %s, consumer at %d, "
+ "consumer node %d, pid %d, status %d - marking CONSUMER_DONE",
+ squeue->sq_key, consumerIdx, cstate->cs_node, cstate->cs_pid,
+ cstate->cs_status);
+
/* Inform producer the consumer have done the job */
cstate->cs_status = CONSUMER_DONE;
/*
@@ -1010,7 +1120,6 @@ SharedQueueReset(SharedQueue squeue, int consumerIdx)
* are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(DEBUG1, "Reset consumer %d of %s", consumerIdx, squeue->sq_key);
}
LWLockRelease(sqsync->sqs_consumer_sync[consumerIdx].cs_lwlock);
@@ -1030,6 +1139,9 @@ SharedQueueResetNotConnected(SharedQueue squeue)
int result = 0;
int i;
+ elog(DEBUG1, "SQueue %s, resetting all unconnected consumers",
+ squeue->sq_key);
+
/* check queue states */
for (i = 0; i < squeue->sq_nconsumers; i++)
{
@@ -1041,7 +1153,9 @@ SharedQueueResetNotConnected(SharedQueue squeue)
cstate->cs_status != CONSUMER_DONE)
{
result++;
- elog(DEBUG1, "Consumer %d of producer %s is cancelled", i, squeue->sq_key);
+ elog(DEBUG1, "SQueue %s, consumer at %d, consumer node %d, pid %d, "
+ "status %d is cancelled - marking CONSUMER_ERROR", squeue->sq_key, i,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
cstate->cs_status = CONSUMER_ERROR;
/* discard tuples which may already be in the queue */
cstate->cs_ntuples = 0;
@@ -1053,7 +1167,6 @@ SharedQueueResetNotConnected(SharedQueue squeue)
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
- elog(DEBUG1, "Reset producer %s", squeue->sq_key);
}
@@ -1121,6 +1234,10 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc,
int i;
int nstores = 0;
+ elog(DEBUG1, "SQueue %s, finishing the SQueue - producer node %d, "
+ "pid %d, nconsumers %d", squeue->sq_key, squeue->sq_nodeid,
+ squeue->sq_pid, squeue->sq_nconsumers);
+
for (i = 0; i < squeue->sq_nconsumers; i++)
{
ConsState *cstate = &squeue->sq_consumers[i];
@@ -1130,6 +1247,9 @@ SharedQueueFinish(SharedQueue squeue, TupleDesc tupDesc,
elog(DEBUG1, "Finishing %s node %d, %ld writes and %ld reads so far, %ld buffer writes, %ld buffer reads, %ld tuples returned to buffer",
squeue->sq_key, cstate->cs_node, cstate->stat_writes, cstate->stat_reads, cstate->stat_buff_writes, cstate->stat_buff_reads, cstate->stat_buff_returns);
#endif
+ elog(DEBUG1, "SQueue %s finishing, consumer at %d, consumer node %d, pid %d, "
+ "status %d", squeue->sq_key, i,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
/*
* if the tuplestore has data and consumer queue has space for some
* try to push rows to the queue. We do not want to do that often
@@ -1206,8 +1326,10 @@ SharedQueueUnBind(SharedQueue squeue, bool failed)
int wait_result = 0;
int i = 0;
int consumer_running = 0;
- char *pcursor = NULL;
+ elog(DEBUG1, "SQueue %s, unbinding the SQueue (failed: %c) - producer node %d, "
+ "pid %d, nconsumers %d", squeue->sq_key, failed ? 'T' : 'F',
+ squeue->sq_nodeid, squeue->sq_pid, squeue->sq_nconsumers);
CHECK:
@@ -1223,11 +1345,25 @@ CHECK:
{
ConsState *cstate = &squeue->sq_consumers[i];
LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock, LW_EXCLUSIVE);
+
+ elog(DEBUG1, "SQueue %s unbinding, check consumer at %d, consumer node %d, pid %d, "
+ "status %d", squeue->sq_key, i,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
+
/* is consumer working yet ? */
if (cstate->cs_status == CONSUMER_ACTIVE && failed)
+ {
+ elog(DEBUG1, "SQueue %s, consumer status CONSUMER_ACTIVE, but "
+ "the operation has failed - marking CONSUMER_ERROR",
+ squeue->sq_key);
+
cstate->cs_status = CONSUMER_ERROR;
+ }
+
if (cstate->cs_status != CONSUMER_DONE)
{
+ elog(DEBUG1, "SQueue %s, consumer not yet done, wake it up and "
+ "wait for it to finish reading", squeue->sq_key);
c_count++;
/* Wake up consumer if it is sleeping */
SetLatch(&sqsync->sqs_consumer_sync[i].cs_latch);
@@ -1242,20 +1378,25 @@ CHECK:
}
if (c_count == 0)
break;
- elog(DEBUG1, "Wait while %d squeue readers finish, %d squeue readers "
- "not yet bound", c_count, unbound_count);
+ elog(DEBUG1, "SQueue %s, wait while %d consumers finish, %d consumers"
+ "not yet bound", squeue->sq_key, c_count, unbound_count);
/* wait for a notification */
wait_result = WaitLatch(&sqsync->sqs_producer_latch,
WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT,
10000L);
if (wait_result & WL_TIMEOUT)
+ {
+ elog(WARNING, "SQueue %s, timeout while waiting for Consumers "
+ "finishing", squeue->sq_key);
break;
+ }
/* got notification, continue loop */
}
#ifdef SQUEUE_STAT
elog(DEBUG1, "Producer %s is done, there were %ld pauses", squeue->sq_key, squeue->stat_paused);
#endif
- elog(DEBUG1, "Producer %s is done", squeue->sq_key);
+ elog(DEBUG1, "SQueue %s, producer node %d, pid %d - unbound successfully",
+ squeue->sq_key, squeue->sq_nodeid, squeue->sq_pid);
LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
@@ -1274,6 +1415,9 @@ CHECK:
/* found a consumer running */
if (CONSUMER_ACTIVE == cstate->cs_status && cstate->cs_pid != 0)
{
+ elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, status %d, "
+ "started running after we finished unbind", squeue->sq_key,
+ cstate->cs_node, cstate->cs_pid, cstate->cs_status);
consumer_running++;
}
@@ -1282,7 +1426,8 @@ CHECK:
if (consumer_running)
{
- elog(DEBUG1, "Producer %s have %d consumers still running, recheck now", squeue->sq_key, consumer_running);
+ elog(DEBUG1, "SQueue %s have %d consumers started running after we "
+ "unbound, recheck now", squeue->sq_key, consumer_running);
LWLockRelease(SQueuesLock);
goto CHECK;
}
@@ -1297,9 +1442,6 @@ CHECK:
elog(PANIC, "Shared queue data corruption");
LWLockRelease(SQueuesLock);
- elog(DEBUG1, "Finalized squeue");
- if (wait_result & WL_TIMEOUT)
- elog(WARNING, "Timeout while waiting for Consumers finishing");
}
@@ -1315,20 +1457,20 @@ SharedQueueRelease(const char *sqname)
bool found;
volatile SharedQueue sq;
- elog(DEBUG1, "Shared Queue release: %s", sqname);
-
LWLockAcquire(SQueuesLock, LW_EXCLUSIVE);
sq = (SharedQueue) hash_search(SharedQueues, sqname, HASH_FIND, &found);
if (found)
{
volatile SQueueSync *sqsync = sq->sq_sync;
- int myid; /* Node Id of the parent data node */
int i;
char ntype = PGXC_NODE_DATANODE;
Assert(sqsync && sqsync->queue == sq);
+ elog(DEBUG1, "SQueue %s producer node %d, pid %d - requested to release",
+ sqname, sq->sq_nodeid, sq->sq_pid);
+
/*
* Case if the shared queue was never bound.
* Just remove it from the hash table.
@@ -1339,7 +1481,7 @@ SharedQueueRelease(const char *sqname)
sqsync->queue = NULL;
if (hash_search(SharedQueues, sqname, HASH_REMOVE, NULL) != sq)
elog(PANIC, "Shared queue data corruption");
- elog(DEBUG1, "Finalized squeue %s", sqname);
+ elog(DEBUG1, "SQueue %s, producer not bound - released SQueue", sqname);
LWLockRelease(SQueuesLock);
return;
}
@@ -1350,7 +1492,8 @@ SharedQueueRelease(const char *sqname)
*/
if (sq->sq_nodeid != PGXC_PARENT_NODE_ID)
{
- elog(DEBUG1, "Looking for consumer %d in %s", myid, sqname);
+ elog(DEBUG1, "SQueue %s, we are consumer from node %d", sqname,
+ PGXC_PARENT_NODE_ID);
/* find specified node in the consumer lists */
for (i = 0; i < sq->sq_nconsumers; i++)
{
@@ -1359,6 +1502,9 @@ SharedQueueRelease(const char *sqname)
{
LWLockAcquire(sqsync->sqs_consumer_sync[i].cs_lwlock,
LW_EXCLUSIVE);
+ elog(DEBUG1, "SQueue %s, consumer node %d, pid %d, "
+ "status %d", sq->sq_key, cstate->cs_node,
+ cstate->cs_pid, cstate->cs_status);
if (cstate->cs_status != CONSUMER_DONE)
{
/* Inform producer the consumer have done the job */
@@ -1374,7 +1520,10 @@ SharedQueueRelease(const char *sqname)
* consumers are finishing
*/
SetLatch(&sqsync->sqs_producer_latch);
- elog(DEBUG1, "Release consumer %d of %s", i, sqname);
+ elog(DEBUG1, "SQueue %s, release consumer at %d, node "
+ "%d, pid %d, status %d ", sqname, i,
+ cstate->cs_node, cstate->cs_pid,
+ cstate->cs_status);
}
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
/* exit */
@@ -1382,6 +1531,9 @@ SharedQueueRelease(const char *sqname)
return;
}
}
+
+ elog(DEBUG1, "SQueue %s, consumer from node %d never bound",
+ sqname, PGXC_PARENT_NODE_ID);
/*
* The consumer was never bound. Find empty consumer slot and
* register node here to let producer know that the node will never
@@ -1397,7 +1549,8 @@ SharedQueueRelease(const char *sqname)
/* Inform producer the consumer have done the job */
cstate->cs_status = CONSUMER_DONE;
SetLatch(&sqsync->sqs_producer_latch);
- elog(DEBUG1, "Release not bound consumer %d of %s", i, sqname);
+ elog(DEBUG1, "SQueue %s, consumer at %d marking as "
+ "CONSUMER_DONE", sqname, i);
LWLockRelease(sqsync->sqs_consumer_sync[i].cs_lwlock);
}
}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 29cc82d233..99f075c839 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4775,6 +4775,10 @@ PostgresMain(int argc, char *argv[],
close_target = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
+ elog(DEBUG3, "Received a 'C' (close) command for %s, type %c",
+ close_target[0] ? close_target : "unnamed_stmt",
+ close_type);
+
switch (close_type)
{
case 'S':
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 2880b2c728..621cbcbc52 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -144,6 +144,10 @@ GetPortalByName(const char *name)
{
Portal portal;
+#ifdef XCP
+ elog(DEBUG3, "Looking up portal %s in the hash table", name);
+#endif
+
if (PointerIsValid(name))
PortalHashTableLookup(name, portal);
else
@@ -224,6 +228,11 @@ CreatePortal(const char *name, bool allowDup, bool dupSilent)
(errcode(ERRCODE_DUPLICATE_CURSOR),
errmsg("closing existing cursor \"%s\"",
name)));
+#ifdef XCP
+ elog(DEBUG3, "cursor \"%s\" already exists, closing existing cursor",
+ name);
+#endif
+
PortalDrop(portal, false);
}
@@ -256,6 +265,8 @@ CreatePortal(const char *name, bool allowDup, bool dupSilent)
PortalHashTableInsert(portal, name);
#ifdef PGXC
+ elog(DEBUG3, "Created portal %s and inserted an entry in the has table");
+
if (PGXCNodeIdentifier == 0)
{
char *node_name;
@@ -572,6 +583,11 @@ PortalDrop(Portal portal, bool isTopCommit)
*/
PortalHashTableDelete(portal);
+#ifdef XCP
+ elog(DEBUG3, "Dropped portal %s (prepared statement %s) and removed entry from the hash table",
+ portal->name, portal->prepStmtName ? portal->prepStmtName : "(null)");
+#endif
+
/* drop cached plan reference, if any */
PortalReleaseCachedPlan(portal);
@@ -676,6 +692,10 @@ PortalHashTableDeleteAll(void)
if (PortalHashTable == NULL)
return;
+#ifdef XCP
+ elog(DEBUG3, "Deleting all entries from the PortalHashTable");
+#endif
+
hash_seq_init(&status, PortalHashTable);
while ((hentry = hash_seq_search(&status)) != NULL)
{