summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/heap/pruneheap.c3
-rw-r--r--src/backend/access/transam/gtm.c29
-rw-r--r--src/backend/bootstrap/bootstrap.c8
-rw-r--r--src/backend/postmaster/Makefile4
-rw-r--r--src/backend/postmaster/clustermon.c407
-rw-r--r--src/backend/postmaster/postmaster.c67
-rw-r--r--src/backend/storage/ipc/ipci.c2
-rw-r--r--src/backend/storage/ipc/procarray.c243
-rw-r--r--src/backend/utils/init/postinit.c2
-rw-r--r--src/gtm/client/fe-protocol.c37
-rw-r--r--src/gtm/client/gtm_client.c113
-rw-r--r--src/gtm/common/Makefile2
-rw-r--r--src/gtm/common/gtm_gxid.c67
-rw-r--r--src/gtm/common/gtm_serialize.c35
-rw-r--r--src/gtm/common/gtm_serialize_debug.c1
-rw-r--r--src/gtm/common/gtm_time.c96
-rw-r--r--src/gtm/common/gtm_utils.c2
-rw-r--r--src/gtm/main/Makefile2
-rw-r--r--src/gtm/main/gtm_snap.c10
-rw-r--r--src/gtm/main/gtm_standby.c9
-rw-r--r--src/gtm/main/gtm_time.c41
-rw-r--r--src/gtm/main/gtm_txn.c124
-rw-r--r--src/gtm/main/main.c14
-rw-r--r--src/gtm/proxy/Makefile2
-rw-r--r--src/gtm/proxy/proxy_main.c250
-rw-r--r--src/gtm/recovery/register_common.c391
-rw-r--r--src/gtm/recovery/register_gtm.c14
-rw-r--r--src/gtm/test/test_serialize.c11
-rw-r--r--src/include/access/gtm.h4
-rw-r--r--src/include/access/xact.h1
-rw-r--r--src/include/gtm/gtm_c.h6
-rw-r--r--src/include/gtm/gtm_client.h31
-rw-r--r--src/include/gtm/gtm_gxid.h47
-rw-r--r--src/include/gtm/gtm_msg.h3
-rw-r--r--src/include/gtm/gtm_time.h5
-rw-r--r--src/include/gtm/gtm_txn.h45
-rw-r--r--src/include/gtm/register.h14
-rw-r--r--src/include/miscadmin.h2
-rw-r--r--src/include/pgxc/pgxc.h2
-rw-r--r--src/include/postmaster/clustermon.h42
-rw-r--r--src/include/storage/procarray.h4
41 files changed, 1608 insertions, 584 deletions
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 563e5c353c..0d605f5728 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -99,7 +99,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer)
else
OldestXmin = RecentGlobalDataXmin;
- Assert(TransactionIdIsValid(OldestXmin));
+ if (!TransactionIdIsValid(OldestXmin))
+ return;
/*
* Let's see if we really need pruning.
diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c
index 91d5fe7182..50b6646c72 100644
--- a/src/backend/access/transam/gtm.c
+++ b/src/backend/access/transam/gtm.c
@@ -21,6 +21,7 @@
#include "pgxc/pgxc.h"
#include "gtm/gtm_c.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/clustermon.h"
#include "storage/backendid.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
@@ -98,6 +99,8 @@ InitGTM(void)
elog(DEBUG1, "Autovacuum worker: connection established to GTM with string %s", conn_str);
else if (IsAutoVacuumLauncherProcess())
elog(DEBUG1, "Autovacuum launcher: connection established to GTM with string %s", conn_str);
+ else if (IsClusterMonitorProcess())
+ elog(DEBUG1, "Cluster monitor: connection established to GTM with string %s", conn_str);
else
elog(DEBUG1, "Postmaster child: connection established to GTM with string %s", conn_str);
}
@@ -133,6 +136,8 @@ CloseGTM(void)
elog(DEBUG1, "Autovacuum worker: connection to GTM closed");
else if (IsAutoVacuumLauncherProcess())
elog(DEBUG1, "Autovacuum launcher: connection to GTM closed");
+ else if (IsClusterMonitorProcess())
+ elog(DEBUG1, "Cluster monitor: connection to GTM closed");
else
elog(DEBUG1, "Postmaster child: connection to GTM closed");
}
@@ -607,7 +612,7 @@ RenameSequenceGTM(char *seqname, const char *newseqname)
* Connection for registering is just used once then closed
*/
int
-RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder)
+RegisterGTM(GTM_PGXCNodeType type, GlobalTransactionId *xmin)
{
int ret;
@@ -616,7 +621,7 @@ RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder)
if (!conn)
return EOF;
- ret = node_register(conn, type, port, PGXCNodeName, datafolder);
+ ret = node_register(conn, type, 0, PGXCNodeName, "", xmin);
/* If something went wrong, retry once */
if (ret < 0)
@@ -624,7 +629,8 @@ RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder)
CloseGTM();
InitGTM();
if (conn)
- ret = node_register(conn, type, port, PGXCNodeName, datafolder);
+ ret = node_register(conn, type, 0, PGXCNodeName, "",
+ xmin);
}
return ret;
@@ -681,3 +687,20 @@ ReportBarrierGTM(char *barrier_id)
return(report_barrier(conn, barrier_id));
}
+int
+ReportGlobalXmin(GlobalTransactionId *gxid, GlobalTransactionId *global_xmin,
+ bool isIdle)
+{
+ int errcode = GTM_ERRCODE_UNKNOWN;
+
+ CheckConnection();
+ if (!conn)
+ return EOF;
+
+ if (report_global_xmin(conn, PGXCNodeName,
+ IS_PGXC_COORDINATOR ? GTM_NODE_COORDINATOR : GTM_NODE_DATANODE,
+ gxid, global_xmin, isIdle, &errcode))
+ return errcode;
+ else
+ return 0;
+}
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index e0253a6c00..d2248df540 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -318,6 +318,9 @@ AuxiliaryProcessMain(int argc, char *argv[])
case PoolerProcess:
statmsg = "pooler process";
break;
+ case ClusterMonitorProcess:
+ statmsg = "cluster monitor process";
+ break;
#endif
case StartupProcess:
statmsg = "startup process";
@@ -422,6 +425,11 @@ AuxiliaryProcessMain(int argc, char *argv[])
/* don't set signals, pool manager has its own agenda */
PoolManagerInit();
proc_exit(1); /* should never return */
+
+ case ClusterMonitorProcess:
+ /* don't set signals, cluster monitor has its own agenda */
+ ClusterMonitorInit();
+ proc_exit(1); /* should never return */
#endif
case CheckerProcess:
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 71c23211b2..80f205c22c 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/postmaster
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \
- pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
+OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o clustermon.o \
+ fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/clustermon.c b/src/backend/postmaster/clustermon.c
new file mode 100644
index 0000000000..75a68bd73c
--- /dev/null
+++ b/src/backend/postmaster/clustermon.c
@@ -0,0 +1,407 @@
+/*-------------------------------------------------------------------------
+ *
+ * clustermon.c
+ *
+ * Postgres-XL Cluster Monitor
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Portions Copyright (c) 2015, 2ndQuadrant Ltd
+ * Portions Copyright (c) 2012-2014, TransLattice, Inc.
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/clustermon.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "access/transam.h"
+#include "access/xact.h"
+#include "gtm/gtm_c.h"
+#include "gtm/gtm_gxid.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "pgxc/pgxc.h"
+#include "postmaster/clustermon.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/postmaster.h"
+#include "storage/proc.h"
+#include "storage/spin.h"
+#include "tcop/tcopprot.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/timeout.h"
+#include "utils/timestamp.h"
+
+/* Flags to tell if we are in a clustermon process */
+static bool am_clustermon = false;
+
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGUSR2 = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+/* Memory context for long-lived data */
+static MemoryContext ClusterMonitorMemCxt;
+static ClusterMonitorCtlData *ClusterMonitorCtl = NULL;
+
+static void cm_sighup_handler(SIGNAL_ARGS);
+static void cm_sigterm_handler(SIGNAL_ARGS);
+static void ClusterMonitorSetReportedGlobalXmin(GlobalTransactionId xmin);
+static GlobalTransactionId ClusterMonitorGetReportedGlobalXmin(void);
+
+/* PID of clustser monitoring process */
+int ClusterMonitorPid = 0;
+
+#define CLUSTER_MONITOR_NAPTIME 5
+
+/*
+ * Main loop for the cluster monitor process.
+ */
+int
+ClusterMonitorInit(void)
+{
+ sigjmp_buf local_sigjmp_buf;
+ GTM_PGXCNodeType nodetype = IS_PGXC_DATANODE ?
+ GTM_NODE_DATANODE :
+ GTM_NODE_COORDINATOR;
+ GlobalTransactionId oldestXmin;
+ GlobalTransactionId newOldestXmin;
+ GlobalTransactionId reportedXmin;
+ GlobalTransactionId lastGlobalXmin;
+ int status;
+
+ am_clustermon = true;
+
+ /* Identify myself via ps */
+ init_ps_display("cluster monitor process", "", "", "");
+
+ ereport(LOG,
+ (errmsg("cluster monitor started")));
+
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ /*
+ * Set up signal handlers. We operate on databases much like a regular
+ * backend, so we use the same signal handling. See equivalent code in
+ * tcop/postgres.c.
+ */
+ pqsignal(SIGHUP, cm_sighup_handler);
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, cm_sigterm_handler);
+
+ pqsignal(SIGQUIT, quickdie);
+ InitializeTimeouts(); /* establishes SIGALRM handler */
+
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * Create a memory context that we will do all our work in. We do this so
+ * that we can reset the context during error recovery and thereby avoid
+ * possible memory leaks.
+ */
+ ClusterMonitorMemCxt = AllocSetContextCreate(TopMemoryContext,
+ "Cluster Monitor",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ MemoryContextSwitchTo(ClusterMonitorMemCxt);
+
+ InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
+
+ SetProcessingMode(NormalProcessing);
+
+ /*
+ * Start a dummy transaction so that we start computing OldestXmin
+ * with the current latestCompletedXid, especially when this server is just
+ * started and joining the cluster
+ */
+ StartTransactionCommand();
+ (void) GetTopTransactionId();
+ CommitTransactionCommand();
+
+ /*
+ * Register this node with the GTM
+ */
+ oldestXmin = InvalidGlobalTransactionId;
+ if (RegisterGTM(nodetype, &oldestXmin) < 0)
+ {
+ UnregisterGTM(nodetype);
+ oldestXmin = InvalidGlobalTransactionId;
+ if (RegisterGTM(nodetype, &oldestXmin) < 0)
+ {
+ ereport(LOG,
+ (errcode(ERRCODE_IO_ERROR),
+ errmsg("Can not register node on GTM")));
+ }
+ }
+
+ /*
+ * If the registration is successful, GTM would send us back current
+ * GlobalXmin. Initialise our local state to the same value
+ */
+ ClusterMonitorSetReportedGlobalXmin(oldestXmin);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * This code is a stripped down version of PostgresMain error recovery.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Forget any pending QueryCancel or timeout request */
+ disable_all_timeouts(false);
+ QueryCancelPending = false; /* second to avoid race condition */
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /*
+ * Now return to normal top-level context and clear ErrorContext for
+ * next time.
+ */
+ MemoryContextSwitchTo(ClusterMonitorMemCxt);
+ FlushErrorState();
+
+ /* Flush any leaked data in the top-level context */
+ MemoryContextResetAndDeleteChildren(ClusterMonitorMemCxt);
+
+ /* Now we can allow interrupts again */
+ RESUME_INTERRUPTS();
+
+ /* if in shutdown mode, no need for anything further; just go away */
+ if (got_SIGTERM)
+ goto shutdown;
+
+ /*
+ * Sleep at least 1 second after any error. We don't want to be
+ * filling the error logs as fast as we can.
+ */
+ pg_usleep(1000000L);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /* must unblock signals before calling rebuild_database_list */
+ PG_SETMASK(&UnBlockSig);
+
+ /*
+ * Force statement_timeout and lock_timeout to zero to avoid letting these
+ * settings prevent regular maintenance from being executed.
+ */
+ SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+ SetConfigOption("lock_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /* loop until shutdown request */
+ while (!got_SIGTERM)
+ {
+ struct timeval nap;
+ int rc;
+ bool isIdle;
+
+ /*
+ * Compute RecentGlobalXmin, report it to the GTM and sleep for the set
+ * interval. Keep doing this forever
+ */
+ isIdle = false;
+ reportedXmin = ClusterMonitorGetReportedGlobalXmin();
+ lastGlobalXmin = ClusterMonitorGetGlobalXmin();
+ oldestXmin = GetOldestXminInternal(NULL, false, true, &isIdle,
+ lastGlobalXmin, reportedXmin);
+
+ if (GlobalTransactionIdPrecedes(oldestXmin, reportedXmin))
+ oldestXmin = reportedXmin;
+
+ if (GlobalTransactionIdPrecedes(oldestXmin, lastGlobalXmin))
+ oldestXmin = lastGlobalXmin;
+
+ if ((status = ReportGlobalXmin(&oldestXmin, &newOldestXmin, isIdle)))
+ {
+ elog(DEBUG2, "Failed to report RecentGlobalXmin to GTM - %d:%d",
+ status, newOldestXmin);
+ if (status == GTM_ERRCODE_TOO_OLD_XMIN ||
+ status == GTM_ERRCODE_NODE_EXCLUDED)
+ elog(PANIC, "Global xmin computation mismatch");
+ }
+ else
+ {
+ ClusterMonitorSetReportedGlobalXmin(oldestXmin);
+ elog(DEBUG2, "Updating global_xmin to %d", newOldestXmin);
+ if (GlobalTransactionIdIsValid(newOldestXmin))
+ ClusterMonitorSetGlobalXmin(newOldestXmin);
+ }
+
+ /*
+ * Repeat at every 30 seconds
+ */
+ nap.tv_sec = CLUSTER_MONITOR_NAPTIME;
+ nap.tv_usec = 0;
+
+ /*
+ * Wait until naptime expires or we get some type of signal (all the
+ * signal handlers will wake us by calling SetLatch).
+ */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ (nap.tv_sec * 1000L) + (nap.tv_usec / 1000L));
+
+ ResetLatch(MyLatch);
+
+ /* Process sinval catchup interrupts that happened while sleeping */
+ ProcessCatchupInterrupt();
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ /* the normal shutdown case */
+ if (got_SIGTERM)
+ break;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+ }
+
+ /* Normal exit from the cluster monitor is here */
+shutdown:
+ UnregisterGTM(nodetype);
+ ereport(LOG,
+ (errmsg("cluster monitor shutting down")));
+
+ proc_exit(0); /* done */
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+cm_sighup_handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGHUP = true;
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+/* SIGTERM: time to die */
+static void
+cm_sigterm_handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGTERM = true;
+ SetLatch(MyLatch);
+
+ errno = save_errno;
+}
+
+
+/*
+ * IsClusterMonitor functions
+ * Return whether this is either a cluster monitor process or a worker
+ * process.
+ */
+bool
+IsClusterMonitorProcess(void)
+{
+ return am_clustermon;
+}
+
+/* Report shared-memory space needed by ClusterMonitor */
+Size
+ClusterMonitorShmemSize(void)
+{
+ return sizeof (ClusterMonitorCtlData);
+}
+
+void
+ClusterMonitorShmemInit(void)
+{
+ bool found;
+
+ ClusterMonitorCtl = (ClusterMonitorCtlData *)
+ ShmemInitStruct("Cluster Monitor Ctl", ClusterMonitorShmemSize(), &found);
+
+ if (!found)
+ {
+ /* First time through, so initialize */
+ MemSet(ClusterMonitorCtl, 0, ClusterMonitorShmemSize());
+ SpinLockInit(&ClusterMonitorCtl->mutex);
+ }
+}
+
+GlobalTransactionId
+ClusterMonitorGetGlobalXmin(void)
+{
+ GlobalTransactionId xmin;
+
+ SpinLockAcquire(&ClusterMonitorCtl->mutex);
+ xmin = ClusterMonitorCtl->gtm_recent_global_xmin;
+ SpinLockRelease(&ClusterMonitorCtl->mutex);
+
+ return xmin;
+}
+
+void
+ClusterMonitorSetGlobalXmin(GlobalTransactionId xmin)
+{
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ ProcArrayCheckXminConsistency(xmin);
+
+ SpinLockAcquire(&ClusterMonitorCtl->mutex);
+ ClusterMonitorCtl->gtm_recent_global_xmin = xmin;
+ SpinLockRelease(&ClusterMonitorCtl->mutex);
+
+ LWLockRelease(ProcArrayLock);
+}
+
+static void
+ClusterMonitorSetReportedGlobalXmin(GlobalTransactionId xmin)
+{
+ elog(DEBUG2, "ClusterMonitorSetReportedGlobalXmin - old %d, new %d",
+ ClusterMonitorCtl->reported_recent_global_xmin,
+ xmin);
+ SpinLockAcquire(&ClusterMonitorCtl->mutex);
+ ClusterMonitorCtl->reported_recent_global_xmin = xmin;
+ SpinLockRelease(&ClusterMonitorCtl->mutex);
+}
+
+static GlobalTransactionId
+ClusterMonitorGetReportedGlobalXmin(void)
+{
+ GlobalTransactionId reported_xmin;
+
+ SpinLockAcquire(&ClusterMonitorCtl->mutex);
+ reported_xmin = ClusterMonitorCtl->reported_recent_global_xmin;
+ SpinLockRelease(&ClusterMonitorCtl->mutex);
+
+ return reported_xmin;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 58d9c9dfdc..86ebaac5f0 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -261,6 +261,9 @@ static pid_t StartupPID = 0,
#ifdef PGXC /* PGXC_COORD */
PgPoolerPID = 0,
#endif /* PGXC_COORD */
+#ifdef XCP
+ ClusterMonPID = 0,
+#endif
BgWriterPID = 0,
CheckpointerPID = 0,
WalWriterPID = 0,
@@ -581,6 +584,7 @@ Datum xc_lockForBackupKey1;
Datum xc_lockForBackupKey2;
#define StartPoolManager() StartChildProcess(PoolerProcess)
+#define StartClusterMonitor() StartChildProcess(ClusterMonitorProcess)
#endif
#define StartupDataBase() StartChildProcess(StartupProcess)
@@ -1359,6 +1363,7 @@ PostmasterMain(int argc, char *argv[])
MemoryContextSwitchTo(oldcontext);
#endif /* PGXC */
+
/* Some workers may be scheduled to start now */
maybe_start_bgworker();
@@ -1781,6 +1786,12 @@ ServerLoop(void)
PgPoolerPID = StartPoolManager();
#endif /* PGXC */
+#ifdef XCP
+ /* If we have lost the cluster monitor, try to start a new one */
+ if (ClusterMonPID == 0 && pmState == PM_RUN)
+ ClusterMonPID = StartClusterMonitor();
+#endif
+
/* If we have lost the archiver, try to start a new one. */
if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = pgarch_start();
@@ -2482,6 +2493,10 @@ SIGHUP_handler(SIGNAL_ARGS)
if (PgPoolerPID != 0)
signal_child(PgPoolerPID, SIGHUP);
#endif /* PGXC */
+#ifdef XCP
+ if (ClusterMonPID != 0)
+ signal_child(ClusterMonPID, SIGHUP);
+#endif
if (BgWriterPID != 0)
signal_child(BgWriterPID, SIGHUP);
if (CheckpointerPID != 0)
@@ -2571,7 +2586,8 @@ pmdie(SIGNAL_ARGS)
/* and the pool manager too */
if (PgPoolerPID != 0)
signal_child(PgPoolerPID, SIGTERM);
-
+ if (ClusterMonPID != 0)
+ signal_child(ClusterMonPID, SIGTERM);
#endif
/*
@@ -2619,6 +2635,9 @@ pmdie(SIGNAL_ARGS)
/* and the pool manager too */
if (PgPoolerPID != 0)
signal_child(PgPoolerPID, SIGTERM);
+ /* and the cluster monitor too */
+ if (ClusterMonPID != 0)
+ signal_child(ClusterMonPID, SIGTERM);
#endif /* XCP */
SignalUnconnectedWorkers(SIGTERM);
if (pmState == PM_RECOVERY)
@@ -2649,6 +2668,10 @@ pmdie(SIGNAL_ARGS)
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
+#ifdef XCP
+ if (ClusterMonPID != 0)
+ signal_child(ClusterMonPID, SIGTERM);
+#endif
pmState = PM_WAIT_BACKENDS;
}
@@ -2807,6 +2830,11 @@ reaper(SIGNAL_ARGS)
PgPoolerPID = StartPoolManager();
#endif /* PGXC */
+#ifdef XCP
+ if (ClusterMonPID == 0)
+ ClusterMonPID = StartClusterMonitor();
+#endif
+
/* workers may be scheduled to start now */
maybe_start_bgworker();
@@ -2991,6 +3019,16 @@ reaper(SIGNAL_ARGS)
}
#endif
+#ifdef XCP
+ if (pid == ClusterMonPID)
+ {
+ ClusterMonPID = 0;
+ if (!EXIT_STATUS_0(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("cluster monitor process"));
+ continue;
+ }
+#endif
/* Was it one of our background workers? */
if (CleanupBackgroundWorker(pid, exitstatus))
{
@@ -3426,6 +3464,19 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
}
#endif /* PGXC */
+#ifdef XCP
+ if (pid == ClusterMonPID)
+ ClusterMonPID = 0;
+ else if (ClusterMonPID != 0 && !FatalError)
+ {
+ ereport(DEBUG2,
+ (errmsg_internal("sending %s to process %d",
+ (SendStop ? "SIGSTOP" : "SIGQUIT"),
+ (int) ClusterMonPID)));
+ signal_child(ClusterMonPID, (SendStop ? SIGSTOP : SIGQUIT));
+ }
+#endif
+
/*
* Force a power-cycle of the pgarch process too. (This isn't absolutely
* necessary, but it seems like a good idea for robustness, and it
@@ -3611,6 +3662,9 @@ PostmasterStateMachine(void)
#ifdef PGXC
PgPoolerPID == 0 &&
#endif
+#ifdef XCP
+ ClusterMonPID == 0 &&
+#endif
WalReceiverPID == 0 &&
BgWriterPID == 0 &&
(CheckpointerPID == 0 ||
@@ -3711,6 +3765,9 @@ PostmasterStateMachine(void)
#ifdef PGXC
Assert(PgPoolerPID == 0);
#endif
+#ifdef XCP
+ Assert(ClusterMonPID == 0);
+#endif
Assert(StartupPID == 0);
Assert(WalReceiverPID == 0);
Assert(BgWriterPID == 0);
@@ -3923,6 +3980,10 @@ TerminateChildren(int signal)
if (PgPoolerPID != 0)
signal_child(PgPoolerPID, SIGQUIT);
#endif
+#ifdef XCP
+ if (ClusterMonPID != 0)
+ signal_child(ClusterMonPID, signal);
+#endif
if (BgWriterPID != 0)
signal_child(BgWriterPID, signal);
if (CheckpointerPID != 0)
@@ -5334,6 +5395,10 @@ StartChildProcess(AuxProcType type)
ereport(LOG,
(errmsg("could not fork pool manager process: %m")));
break;
+ case ClusterMonitorProcess:
+ ereport(LOG,
+ (errmsg("could not fork cluster monitor process: %m")));
+ break;
#endif
case StartupProcess:
ereport(LOG,
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index db5f4f605c..3895031901 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -153,6 +153,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, SharedQueueShmemSize());
if (IS_PGXC_COORDINATOR)
size = add_size(size, ClusterLockShmemSize());
+ size = add_size(size, ClusterMonitorShmemSize());
#endif
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
@@ -274,6 +275,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
SharedQueuesInit();
if (IS_PGXC_COORDINATOR)
ClusterLockShmemInit();
+ ClusterMonitorShmemInit();
#endif
/*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e26ff6c102..48840c24bb 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1220,10 +1220,39 @@ TransactionIdIsActive(TransactionId xid)
TransactionId
GetOldestXmin(Relation rel, bool ignoreVacuum)
{
+ return GetOldestXminInternal(rel, ignoreVacuum, false, NULL,
+ InvalidTransactionId, InvalidTransactionId);
+}
+
+/*
+ * This implements most of the logic that GetOldestXmin needs. In XL, we don't
+ * actually compute OldestXmin unless specifically told to do by computeLocal
+ * argument set to true which GetOldestXmin never done. So we just return the
+ * value from the shared memory. The OldestXmin itself is always computed by
+ * the Cluster Monitor process by sending local state information to the GTM,
+ * which then aggregates information from all the nodes and gives out final
+ * OldestXmin or GlobalXmin which is consistent across the entire cluster.
+ *
+ * In addition, Cluster Monitor also passes the last reported xmin (or the one
+ * sent back by GTM in case we were idle) and the last received GlobalXmin. We
+ * must ensure that we don't see an XID or xmin which is beyond these horizons.
+ * Otherwise it signals problems with the GlobalXmin calculation. This can
+ * happen because of network disconnects or extreme load on the machine
+ * (unlikely). In any case, we must restart ourselves to avoid any data
+ * consistency problem. A more careful approach could involve killing only
+ * those backends which are running with old xid or xmin. We can consider
+ * implementing it that way in future
+ */
+TransactionId
+GetOldestXminInternal(Relation rel, bool ignoreVacuum, bool computeLocal,
+ bool *isIdle, TransactionId lastGlobalXmin,
+ TransactionId lastReportedXmin)
+{
ProcArrayStruct *arrayP = procArray;
TransactionId result;
int index;
bool allDbs;
+ TransactionId xmin;
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
@@ -1235,9 +1264,14 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
*/
allDbs = rel == NULL || rel->rd_rel->relisshared;
-#ifdef PGXC
- if (TransactionIdIsValid(RecentGlobalXmin))
- return RecentGlobalXmin;
+#ifdef XCP
+ if (!computeLocal)
+ {
+ xmin = (TransactionId) ClusterMonitorGetGlobalXmin();
+ if (!TransactionIdIsValid(xmin))
+ xmin = FirstNormalTransactionId;
+ return xmin;
+ }
#endif
/* Cannot look for individual databases during recovery */
@@ -1252,8 +1286,18 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
* additions.
*/
result = ShmemVariableCache->latestCompletedXid;
+#ifdef XCP
+ if (!TransactionIdIsValid(result))
+ result = FirstNormalTransactionId;
+ else
+ TransactionIdAdvance(result);
+
+ if (isIdle)
+ *isIdle = true;
+#else
Assert(TransactionIdIsNormal(result));
TransactionIdAdvance(result);
+#endif
for (index = 0; index < arrayP->numProcs; index++)
{
@@ -1277,6 +1321,9 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = pgxact->xid;
+#ifdef XCP
+ TransactionId xmin;
+#endif
/* First consider the transaction's own Xid, if any */
if (TransactionIdIsNormal(xid) &&
@@ -1290,10 +1337,49 @@ GetOldestXmin(Relation rel, bool ignoreVacuum)
* have an Xmin but not (yet) an Xid; conversely, if it has an
* Xid, that could determine some not-yet-set Xmin.
*/
+#ifdef XCP
+ xmin = pgxact->xmin; /* Fetch just once */
+ if (TransactionIdIsNormal(xmin) &&
+ TransactionIdPrecedes(xmin, result))
+ result = xmin;
+
+ /*
+ * If we found a normal xid or a transaction running with xmin set,
+ * we are not idle
+ */
+ if (isIdle &&
+ (TransactionIdIsNormal(xmin) || TransactionIdIsNormal(xid)))
+ *isIdle = false;
+
+ /*
+ * If we see an xid or an xmin which precedes either the last
+ * reported xmin or the GlobalXmin calculated by the
+ * Cluster Monitor process then it signals bad things and we must
+ * abort and restart the database server
+ */
+ if (TransactionIdIsValid(lastReportedXmin))
+ {
+ if ((TransactionIdIsValid(xmin) && TransactionIdPrecedes(xmin, lastReportedXmin)) ||
+ (TransactionIdIsValid(xid) && TransactionIdPrecedes(xid,
+ lastReportedXmin)))
+ elog(PANIC, "Found xid (%d) or xmin (%d) precedes last "
+ "reported xmin (%d)", xid, xmin, lastReportedXmin);
+ }
+
+ if (TransactionIdIsValid(lastGlobalXmin))
+ {
+ if ((TransactionIdIsValid(xmin) && TransactionIdPrecedes(xmin, lastGlobalXmin)) ||
+ (TransactionIdIsValid(xid) && TransactionIdPrecedes(xid,
+ lastGlobalXmin)))
+ elog(PANIC, "Found xid (%d) or xmin (%d) precedes "
+ "global xmin (%d)", xid, xmin, lastGlobalXmin);
+ }
+#else
xid = pgxact->xmin; /* Fetch just once */
if (TransactionIdIsNormal(xid) &&
TransactionIdPrecedes(xid, result))
result = xid;
+#endif
}
}
@@ -1433,6 +1519,9 @@ GetSnapshotData(Snapshot snapshot)
bool suboverflowed = false;
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
+#ifdef XCP
+ TransactionId clustermon_xmin;
+#endif
#ifdef PGXC /* PGXC_DATANODE */
/*
@@ -1659,6 +1748,12 @@ GetSnapshotData(Snapshot snapshot)
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
+#ifdef XCP
+ clustermon_xmin = ClusterMonitorGetGlobalXmin();
+ if (TransactionIdPrecedes(clustermon_xmin, globalxmin))
+ globalxmin = clustermon_xmin;
+#endif
+
/* Update global variables too */
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
@@ -3129,10 +3224,7 @@ GetSnapshotDataFromGTM(Snapshot snapshot)
* establishment or auto-analyze scans. Nevertheless, we MUST fix this
* before going to production release
*/
- if (IS_PGXC_LOCAL_COORDINATOR)
- gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionId(), canbe_grouped);
- else
- gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped);
+ gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped);
if (!gtm_snapshot)
@@ -3141,7 +3233,13 @@ GetSnapshotDataFromGTM(Snapshot snapshot)
errmsg("GTM error, could not obtain snapshot. Current XID = %d, Autovac = %d", GetCurrentTransactionId(), IsAutoVacuumWorkerProcess())));
else
{
- RecentGlobalXmin = gtm_snapshot->sn_recent_global_xmin;
+ /*
+ * Set RecentGlobalXmin by copying from the shared memory state
+ * maintained by the Clutser Monitor
+ */
+ RecentGlobalXmin = ClusterMonitorGetGlobalXmin();
+ if (!TransactionIdIsValid(RecentGlobalXmin))
+ RecentGlobalXmin = FirstNormalTransactionId;
/*
* XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ?
*/
@@ -3161,6 +3259,7 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot)
{
int index;
ProcArrayStruct *arrayP = procArray;
+ TransactionId global_xmin;
snapshot->xmin = globalSnapshot.gxmin;
snapshot->xmax = globalSnapshot.gxmax;
@@ -3208,15 +3307,6 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot)
snapshot->max_xcnt = globalSnapshot.gxcnt;
}
- memcpy(snapshot->xip, globalSnapshot.gxip,
- globalSnapshot.gxcnt * sizeof(TransactionId));
- snapshot->curcid = GetCurrentCommandId(false);
-
- if (!TransactionIdIsValid(MyPgXact->xmin))
- MyPgXact->xmin = TransactionXmin = globalSnapshot.gxmin;
-
- RecentXmin = globalSnapshot.gxmin;
-
/* PGXCTODO - set this until we handle subtransactions. */
snapshot->subxcnt = 0;
@@ -3235,64 +3325,37 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot)
LWLockAcquire(ProcArrayLock, LW_SHARED);
/*
- * Spin over analyzeProcArray and add these local analyze XIDs to the
- * local snapshot.
+ * Once we have a SHARED lock on the ProcArrayLock, fetch the
+ * GlobalXmin and ensure that the snapshot we are dealing with isn't
+ * too old. Since such a snapshot may need to see rows that have
+ * already been removed by the server
+ *
+ * These scenarios are not very likely to happen because the
+ * ClusterMonitor will ensure that GlobalXmins are reported to GTM in
+ * time and the GlobalXmin on the GTM can't advance past the reported
+ * xmins. But in some cases where a node fails to report its GlobalXmin
+ * and gets excluded from the list of nodes on GTM, the GlobalXmin will
+ * be advanced. Usually such node will shoot itself in the head
+ * and rejoin the cluster, but if at all it sends a snapshot to us, we
+ * should protect ourselves from using it
*/
- for (index = 0; index < arrayP->numProcs; index++)
- {
- int pgprocno = arrayP->pgprocnos[index];
- volatile PGPROC *proc = &allProcs[pgprocno];
- volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ global_xmin = ClusterMonitorGetGlobalXmin();
+ if (!TransactionIdIsValid(global_xmin))
+ global_xmin = FirstNormalTransactionId;
- /* Do that only for an autovacuum process */
- if (pgxact->vacuumFlags & PROC_IS_AUTOVACUUM)
- {
- TransactionId xid;
-
- /* Update globalxmin to be the smallest valid xmin */
- xid = pgxact->xmin; /* fetch just once */
+ if (TransactionIdPrecedes(globalSnapshot.gxmin, global_xmin))
+ elog(ERROR, "Snapshot too old - RecentGlobalXmin has already "
+ "advanced past the snapshot xmin");
- if (TransactionIdIsNormal(xid) &&
- TransactionIdPrecedes(xid, RecentGlobalXmin))
- RecentGlobalXmin = xid;
+ memcpy(snapshot->xip, globalSnapshot.gxip,
+ globalSnapshot.gxcnt * sizeof(TransactionId));
+ snapshot->curcid = GetCurrentCommandId(false);
- /* Fetch xid just once - see GetNewTransactionId */
- xid = pgxact->xid;
+ if (!TransactionIdIsValid(MyPgXact->xmin))
+ MyPgXact->xmin = TransactionXmin = globalSnapshot.gxmin;
- /*
- * If the transaction has been assigned an xid < xmax we add it to the
- * snapshot, and update xmin if necessary. There's no need to store
- * XIDs >= xmax, since we'll treat them as running anyway. We don't
- * bother to examine their subxids either.
- *
- * We don't include our own XID (if any) in the snapshot, but we must
- * include it into xmin.
- */
- if (TransactionIdIsNormal(xid))
- {
- if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
- continue;
- if (proc != MyProc)
- {
- if (snapshot->xcnt >= snapshot->max_xcnt)
- {
- snapshot->max_xcnt += arrayP->numProcs;
-
- snapshot->xip = (TransactionId *)
- realloc(snapshot->xip, snapshot->max_xcnt * sizeof(TransactionId));
- if (snapshot->xip == NULL)
- ereport(ERROR,
- (errcode(ERRCODE_OUT_OF_MEMORY),
- errmsg("out of memory")));
- }
- snapshot->xip[snapshot->xcnt++] = xid;
- elog(DEBUG1, "Adding Analyze for xid %d to snapshot", pgxact->xid);
- }
- if (TransactionIdPrecedes(xid, snapshot->xmin))
- snapshot->xmin = xid;
- }
- }
- }
+ RecentXmin = globalSnapshot.gxmin;
+ RecentGlobalXmin = global_xmin;
/*
* XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ?
@@ -4283,3 +4346,43 @@ KnownAssignedXidsReset(void)
LWLockRelease(ProcArrayLock);
}
+
+/*
+ * Do a consistency check on the running processes. Cluster Monitor uses this
+ * API to check if some transaction has started with an xid or xmin lower than
+ * the GlobalXmin reported by the GTM. This can only happen under extreme
+ * conditions and we must take necessary steps to safe-guard against such
+ * anomalies.
+ */
+void
+ProcArrayCheckXminConsistency(TransactionId global_xmin)
+{
+ ProcArrayStruct *arrayP = procArray;
+ int index;
+
+ for (index = 0; index < arrayP->numProcs; index++)
+ {
+ int pgprocno = arrayP->pgprocnos[index];
+ volatile PGPROC *proc = &allProcs[pgprocno];
+ volatile PGXACT *pgxact = &allPgXact[pgprocno];
+ TransactionId xid;
+
+ xid = pgxact->xid; /* fetch just once */
+ if (!TransactionIdIsNormal(xid))
+ continue;
+
+ if (!TransactionIdFollowsOrEquals(xid, global_xmin))
+ elog(PANIC, "xmin consistency check failed - found %d xid in "
+ "PGPROC %d ahead of GlobalXmin %d", xid, pgprocno,
+ global_xmin);
+
+ xid = pgxact->xmin; /* fetch just once */
+ if (!TransactionIdIsNormal(xid))
+ continue;
+
+ if (!TransactionIdFollowsOrEquals(xid, global_xmin))
+ elog(PANIC, "xmin consistency check failed - found %d xmin in "
+ "PGPROC %d ahead of GlobalXmin %d", xid, pgprocno,
+ global_xmin);
+ }
+}
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 6fc55751b5..e33bc65a89 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -657,7 +657,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
before_shmem_exit(ShutdownPostgres, 0);
/* The autovacuum launcher is done here */
- if (IsAutoVacuumLauncherProcess())
+ if (IsAutoVacuumLauncherProcess() || IsClusterMonitorProcess())
return;
/*
diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c
index 64a88cb540..bc0a72ea93 100644
--- a/src/gtm/client/fe-protocol.c
+++ b/src/gtm/client/fe-protocol.c
@@ -512,14 +512,6 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
break;
}
- if (gtmpqGetnchar((char *)&result->gr_snapshot.sn_recent_global_xmin,
- sizeof (GlobalTransactionId), conn))
- {
- result->gr_status = GTM_RESULT_ERROR;
- break;
- }
-
-
if (gtmpqGetInt((int *)&result->gr_snapshot.sn_xcnt,
sizeof (int32), conn))
{
@@ -739,6 +731,13 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
break;
}
result->gr_resdata.grd_node.node_name[result->gr_resdata.grd_node.len] = '\0';
+
+ if (result->gr_type == NODE_UNREGISTER_RESULT)
+ break;
+
+ if (gtmpqGetInt((char *)&result->gr_resdata.grd_node.xmin, sizeof
+ (GlobalTransactionId), conn))
+ result->gr_status = GTM_RESULT_ERROR;
break;
case NODE_LIST_RESULT:
@@ -789,6 +788,28 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
}
case BARRIER_RESULT:
break;
+
+ case REPORT_XMIN_RESULT:
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.reported_xmin,
+ sizeof (GlobalTransactionId), conn))
+ {
+ result->gr_status = GTM_RESULT_ERROR;
+ break;
+ }
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.global_xmin,
+ sizeof (GlobalTransactionId), conn))
+ {
+ result->gr_status = GTM_RESULT_ERROR;
+ break;
+ }
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.errcode,
+ sizeof (int), conn))
+ {
+ result->gr_status = GTM_RESULT_ERROR;
+ break;
+ }
+ break;
+
default:
printfGTMPQExpBuffer(&conn->errorMessage,
"unexpected result type from server; result typr was \"%d\"\n",
diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c
index 5058e73b0e..1f1f6f50fd 100644
--- a/src/gtm/client/gtm_client.c
+++ b/src/gtm/client/gtm_client.c
@@ -28,7 +28,6 @@
#endif
#include <time.h>
-
#include "gtm/gtm_c.h"
#include "gtm/gtm_ip.h"
@@ -40,6 +39,7 @@
#include "gtm/gtm_serialize.h"
#include "gtm/register.h"
#include "gtm/assert.h"
+#include "pgxc/pgxc.h"
extern bool Backup_synchronously;
@@ -77,7 +77,9 @@ static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequ
GTM_Sequence minval, GTM_Sequence maxval,
GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup);
static int node_register_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port,
- char *node_name, char *datafolder, GTM_PGXCNodeStatus status, bool is_backup);
+ char *node_name, char *datafolder,
+ GTM_PGXCNodeStatus status, bool is_backup,
+ GlobalTransactionId *xmin);
static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup);
static int report_barrier_internal(GTM_Conn *conn, char *barrier_id, bool is_backup);
/*
@@ -1654,7 +1656,8 @@ int node_register(GTM_Conn *conn,
GTM_PGXCNodeType type,
GTM_PGXCNodePort port,
char *node_name,
- char *datafolder)
+ char *datafolder,
+ GlobalTransactionId *xmin)
{
char host[1024];
int rc;
@@ -1665,7 +1668,8 @@ int node_register(GTM_Conn *conn,
return -1;
}
- return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, false);
+ return node_register_worker(conn, type, host, port, node_name, datafolder,
+ NODE_CONNECTED, false, xmin);
}
int node_register_internal(GTM_Conn *conn,
@@ -1674,27 +1678,11 @@ int node_register_internal(GTM_Conn *conn,
GTM_PGXCNodePort port,
char *node_name,
char *datafolder,
- GTM_PGXCNodeStatus status)
-{
- return node_register_worker(conn, type, host, port, node_name, datafolder, status, false);
-}
-
-int bkup_node_register(GTM_Conn *conn,
- GTM_PGXCNodeType type,
- GTM_PGXCNodePort port,
- char *node_name,
- char *datafolder)
+ GTM_PGXCNodeStatus status,
+ GlobalTransactionId *xmin)
{
- char host[1024];
- int rc;
-
- node_get_local_addr(conn, host, sizeof(host), &rc);
- if (rc != 0)
- {
- return -1;
- }
-
- return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, true);
+ return node_register_worker(conn, type, host, port, node_name, datafolder,
+ status, false, xmin);
}
int bkup_node_register_internal(GTM_Conn *conn,
@@ -1703,9 +1691,11 @@ int bkup_node_register_internal(GTM_Conn *conn,
GTM_PGXCNodePort port,
char *node_name,
char *datafolder,
- GTM_PGXCNodeStatus status)
+ GTM_PGXCNodeStatus status,
+ GlobalTransactionId xmin)
{
- return node_register_worker(conn, type, host, port, node_name, datafolder, status, true);
+ return node_register_worker(conn, type, host, port, node_name, datafolder,
+ status, true, &xmin);
}
static int node_register_worker(GTM_Conn *conn,
@@ -1715,7 +1705,8 @@ static int node_register_worker(GTM_Conn *conn,
char *node_name,
char *datafolder,
GTM_PGXCNodeStatus status,
- bool is_backup)
+ bool is_backup,
+ GlobalTransactionId *xmin)
{
GTM_Result *res = NULL;
time_t finish_time;
@@ -1754,7 +1745,9 @@ static int node_register_worker(GTM_Conn *conn,
/* Data Folder (var-len) */
gtmpqPutnchar(datafolder, strlen(datafolder), conn) ||
/* Node Status */
- gtmpqPutInt(status, sizeof(GTM_PGXCNodeStatus), conn))
+ gtmpqPutInt(status, sizeof(GTM_PGXCNodeStatus), conn) ||
+ /* Recent Xmin */
+ gtmpqPutnchar((char *)xmin, sizeof (GlobalTransactionId), conn))
{
goto send_failed;
}
@@ -1790,6 +1783,8 @@ static int node_register_worker(GTM_Conn *conn,
{
Assert(res->gr_resdata.grd_node.type == type);
Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0));
+ if (xmin)
+ *xmin = res->gr_resdata.grd_node.xmin;
}
return res->gr_status;
@@ -2217,7 +2212,6 @@ snapshot_get_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out));
memcpy(xmin_out, &res->gr_snapshot.sn_xmin, sizeof(GlobalTransactionId));
memcpy(xmax_out, &res->gr_snapshot.sn_xmax, sizeof(GlobalTransactionId));
- memcpy(recent_global_xmin_out, &res->gr_snapshot.sn_recent_global_xmin, sizeof(GlobalTransactionId));
memcpy(xcnt_out, &res->gr_snapshot.sn_xcnt, sizeof(int32));
}
@@ -2430,3 +2424,64 @@ send_failed:
conn->result->gr_status = GTM_RESULT_COMM_ERROR;
return -1;
}
+
+int
+report_global_xmin(GTM_Conn *conn, const char *node_name,
+ GTM_PGXCNodeType type, GlobalTransactionId *gxid,
+ GlobalTransactionId *global_xmin,
+ bool isIdle, int *errcode)
+{
+ GTM_Result *res = NULL;
+ time_t finish_time;
+
+ if (gtmpqPutMsgStart('C', true, conn) ||
+ gtmpqPutInt(MSG_REPORT_XMIN, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutnchar((char *)gxid, sizeof(GlobalTransactionId), conn) ||
+ gtmpqPutc(isIdle, conn) ||
+ gtmpqPutInt(type, sizeof (GTM_PGXCNodeType), conn) ||
+ gtmpqPutInt(strlen(node_name), sizeof (GTM_StrLen), conn) ||
+ gtmpqPutnchar(node_name, strlen(node_name), conn))
+ {
+ goto send_failed;
+ }
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ {
+ goto send_failed;
+ }
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ {
+ goto send_failed;
+ }
+
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ {
+ goto receive_failed;
+ }
+
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ {
+ goto receive_failed;
+ }
+
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ *gxid = res->gr_resdata.grd_report_xmin.reported_xmin;
+ *global_xmin = res->gr_resdata.grd_report_xmin.global_xmin;
+ *errcode = res->gr_resdata.grd_report_xmin.errcode;
+ }
+ return res->gr_status;
+
+receive_failed:
+send_failed:
+ conn->result = makeEmptyResultIfIsNull(conn->result);
+ conn->result->gr_status = GTM_RESULT_COMM_ERROR;
+ return -1;
+
+}
+
diff --git a/src/gtm/common/Makefile b/src/gtm/common/Makefile
index 31e0c25ff9..cb58dc998e 100644
--- a/src/gtm/common/Makefile
+++ b/src/gtm/common/Makefile
@@ -23,7 +23,7 @@ LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq
LIBS=-lpthread
OBJS = gtm_opt_handler.o aset.o mcxt.o gtm_utils.o elog.o assert.o stringinfo.o gtm_lock.o \
- gtm_list.o gtm_serialize.o gtm_serialize_debug.o
+ gtm_list.o gtm_serialize.o gtm_serialize_debug.o gtm_time.o gtm_gxid.o
all:all-lib
diff --git a/src/gtm/common/gtm_gxid.c b/src/gtm/common/gtm_gxid.c
new file mode 100644
index 0000000000..6debb3e259
--- /dev/null
+++ b/src/gtm/common/gtm_gxid.c
@@ -0,0 +1,67 @@
+#include "gtm/gtm_c.h"
+#include "gtm/gtm_gxid.h"
+
+/*
+ * GlobalTransactionIdPrecedes --- is id1 logically < id2?
+ */
+bool
+GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2)
+{
+ /*
+ * If either ID is a permanent XID then we can just do unsigned
+ * comparison. If both are normal, do a modulo-2^31 comparison.
+ */
+ int32 diff;
+
+ if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
+ return (id1 < id2);
+
+ diff = (int32) (id1 - id2);
+ return (diff < 0);
+}
+
+/*
+ * GlobalTransactionIdPrecedesOrEquals --- is id1 logically <= id2?
+ */
+bool
+GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2)
+{
+ int32 diff;
+
+ if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
+ return (id1 <= id2);
+
+ diff = (int32) (id1 - id2);
+ return (diff <= 0);
+}
+
+/*
+ * GlobalTransactionIdFollows --- is id1 logically > id2?
+ */
+bool
+GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2)
+{
+ int32 diff;
+
+ if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
+ return (id1 > id2);
+
+ diff = (int32) (id1 - id2);
+ return (diff > 0);
+}
+
+/*
+ * GlobalTransactionIdFollowsOrEquals --- is id1 logically >= id2?
+ */
+bool
+GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2)
+{
+ int32 diff;
+
+ if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
+ return (id1 >= id2);
+
+ diff = (int32) (id1 - id2);
+ return (diff >= 0);
+}
+
diff --git a/src/gtm/common/gtm_serialize.c b/src/gtm/common/gtm_serialize.c
index caa1c1c0f2..cabdcb5941 100644
--- a/src/gtm/common/gtm_serialize.c
+++ b/src/gtm/common/gtm_serialize.c
@@ -86,10 +86,6 @@ gtm_serialize_snapshotdata(GTM_SnapshotData *data, char *buf, size_t buflen)
memcpy(buf + len, &(data->sn_xmax), sizeof(GlobalTransactionId));
len += sizeof(GlobalTransactionId);
- /* GTM_SnapshotData.sn_recent_global_xmin */
- memcpy(buf + len, &(data->sn_recent_global_xmin), sizeof(GlobalTransactionId));
- len += sizeof(GlobalTransactionId);
-
/* GTM_SnapshotData.sn_xcnt */
memcpy(buf + len, &(data->sn_xcnt), sizeof(uint32));
len += sizeof(uint32);
@@ -130,10 +126,6 @@ gtm_deserialize_snapshotdata(GTM_SnapshotData *data, const char *buf, size_t buf
memcpy(&(data->sn_xmax), buf + len, sizeof(GlobalTransactionId));
len += sizeof(GlobalTransactionId);
- /* GTM_SnapshotData.sn_recent_global_xmin */
- memcpy(&(data->sn_recent_global_xmin), buf + len, sizeof(GlobalTransactionId));
- len += sizeof(GlobalTransactionId);
-
/* GTM_SnapshotData.sn_xcnt */
memcpy(&(data->sn_xcnt), buf + len, sizeof(uint32));
len += sizeof(uint32);
@@ -209,7 +201,6 @@ gtm_serialize_transactioninfo(GTM_TransactionInfo *data, char *buf, size_t bufle
int len = 0;
char *buf2;
int i;
- int namelen;
/* size check */
if (gtm_get_transactioninfo_size(data) > buflen)
@@ -671,7 +662,10 @@ gtm_get_pgxcnodeinfo_size(GTM_PGXCNodeInfo *data)
len += sizeof(GTM_PGXCNodeStatus); /* status */
- len += sizeof(uint32); /* max_sessions */
+ len += sizeof(bool); /* excluded ?*/
+ len += sizeof(GlobalTransactionId); /* xmin */
+ len += sizeof(GTM_Timestamp); /* reported timestamp */
+
len += sizeof(uint32); /* num_sessions */
if (data->num_sessions > 0) /* sessions */
len += (data->num_sessions * sizeof(GTM_PGXCSession));
@@ -762,6 +756,15 @@ gtm_serialize_pgxcnodeinfo(GTM_PGXCNodeInfo *data, char *buf, size_t buflen)
memcpy(buf + len, &(data->status), sizeof(GTM_PGXCNodeStatus));
len += sizeof(GTM_PGXCNodeStatus);
+ memcpy(buf + len, &(data->excluded), sizeof(bool));
+ len += sizeof(bool);
+
+ memcpy(buf + len, &(data->reported_xmin), sizeof(GlobalTransactionId));
+ len += sizeof(GlobalTransactionId);
+
+ memcpy(buf + len, &(data->reported_xmin_time), sizeof(GTM_Timestamp));
+ len += sizeof(GTM_Timestamp);
+
/* GTM_PGXCNodeInfo.sessions */
len_wk = data->max_sessions;
memcpy(buf + len, &len_wk, sizeof(uint32));
@@ -901,6 +904,18 @@ gtm_deserialize_pgxcnodeinfo(GTM_PGXCNodeInfo *data, const char *buf, size_t buf
memcpy(&(data->status), buf + len, sizeof(GTM_PGXCNodeStatus));
len += sizeof(GTM_PGXCNodeStatus);
+ /* GTM_PGXCNodeInfo.excluded */
+ memcpy(&(data->excluded), buf + len, sizeof (bool));
+ len += sizeof (bool);
+
+ /* GTM_PGXCNodeInfo.reported_xmin */
+ memcpy(&(data->reported_xmin), buf + len, sizeof (GlobalTransactionId));
+ len += sizeof (GlobalTransactionId);
+
+ /* GTM_PGXCNodeInfo.reported_xmin_time */
+ memcpy(&(data->reported_xmin_time), buf + len, sizeof (GTM_Timestamp));
+ len += sizeof (GTM_Timestamp);
+
/* GTM_PGXCNodeInfo.sessions */
memcpy(&len_wk, buf + len, sizeof(uint32));
len += sizeof(uint32);
diff --git a/src/gtm/common/gtm_serialize_debug.c b/src/gtm/common/gtm_serialize_debug.c
index a5d6e47425..d688211ff4 100644
--- a/src/gtm/common/gtm_serialize_debug.c
+++ b/src/gtm/common/gtm_serialize_debug.c
@@ -49,7 +49,6 @@ dump_transactioninfo_elog(GTM_TransactionInfo *txn)
elog(LOG, " sn_xmin: %d", txn->gti_current_snapshot.sn_xmin);
elog(LOG, " sn_xmax: %d", txn->gti_current_snapshot.sn_xmax);
- elog(LOG, " sn_recent_global_xmin: %d", txn->gti_current_snapshot.sn_recent_global_xmin);
elog(LOG, " sn_xcnt: %d", txn->gti_current_snapshot.sn_xcnt);
/* Print all the GXIDs in snapshot */
diff --git a/src/gtm/common/gtm_time.c b/src/gtm/common/gtm_time.c
new file mode 100644
index 0000000000..d121822743
--- /dev/null
+++ b/src/gtm/common/gtm_time.c
@@ -0,0 +1,96 @@
+/*-------------------------------------------------------------------------
+ *
+ * gtm_time.c
+ * Timestamp handling on GTM
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "gtm/gtm.h"
+#include "gtm/gtm_c.h"
+#include "gtm/gtm_time.h"
+#include <time.h>
+#include <sys/time.h>
+
+GTM_Timestamp
+GTM_TimestampGetCurrent(void)
+{
+ struct timeval tp;
+ GTM_Timestamp result;
+
+ gettimeofday(&tp, NULL);
+
+ result = (GTM_Timestamp) tp.tv_sec -
+ ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+#ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+#else
+ result = result + (tp.tv_usec / 1000000.0);
+#endif
+
+ return result;
+}
+
+/*
+ * TimestampDifference -- convert the difference between two timestamps
+ * into integer seconds and microseconds
+ *
+ * Both inputs must be ordinary finite timestamps (in current usage,
+ * they'll be results from GTM_TimestampGetCurrent()).
+ *
+ * We expect start_time <= stop_time. If not, we return zeroes; for current
+ * callers there is no need to be tense about which way division rounds on
+ * negative inputs.
+ */
+void
+GTM_TimestampDifference(GTM_Timestamp start_time, GTM_Timestamp stop_time,
+ long *secs, int *microsecs)
+{
+ GTM_Timestamp diff = stop_time - start_time;
+
+ if (diff <= 0)
+ {
+ *secs = 0;
+ *microsecs = 0;
+ }
+ else
+ {
+#ifdef HAVE_INT64_TIMESTAMP
+ *secs = (long) (diff / USECS_PER_SEC);
+ *microsecs = (int) (diff % USECS_PER_SEC);
+#else
+ *secs = (long) diff;
+ *microsecs = (int) ((diff - *secs) * 1000000.0);
+#endif
+ }
+}
+
+/*
+ * GTM_TimestampDifferenceExceeds -- report whether the difference between two
+ * timestamps is >= a threshold (expressed in milliseconds)
+ *
+ * Both inputs must be ordinary finite timestamps (in current usage,
+ * they'll be results from GTM_TimestampDifferenceExceeds()).
+ */
+bool
+GTM_TimestampDifferenceExceeds(GTM_Timestamp start_time,
+ GTM_Timestamp stop_time,
+ int msec)
+{
+ GTM_Timestamp diff = stop_time - start_time;
+
+#ifdef HAVE_INT64_TIMESTAMP
+ return (diff >= msec * INT64CONST(1000));
+#else
+ return (diff * 1000.0 >= msec);
+#endif
+}
diff --git a/src/gtm/common/gtm_utils.c b/src/gtm/common/gtm_utils.c
index 96c6e95b20..56d09f01ba 100644
--- a/src/gtm/common/gtm_utils.c
+++ b/src/gtm/common/gtm_utils.c
@@ -37,6 +37,7 @@ static struct enum_name message_name_tab[] =
{MSG_NODE_UNREGISTER, "MSG_NODE_UNREGISTER"},
{MSG_BKUP_NODE_UNREGISTER, "MSG_BKUP_NODE_UNREGISTER"},
{MSG_REGISTER_SESSION, "MSG_REGISTER_SESSION"},
+ {MSG_REPORT_XMIN, "MSG_REPORT_XMIN"},
{MSG_NODE_LIST, "MSG_NODE_LIST"},
{MSG_NODE_BEGIN_REPLICATION_INIT, "MSG_NODE_BEGIN_REPLICATION_INIT"},
{MSG_NODE_END_REPLICATION_INIT, "MSG_NODE_END_REPLICATION_INIT"},
@@ -138,6 +139,7 @@ static struct enum_name result_name_tab[] =
{TXN_GET_STATUS_RESULT, "TXN_GET_STATUS_RESULT"},
{TXN_GET_ALL_PREPARED_RESULT, "TXN_GET_ALL_PREPARED_RESULT"},
{TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, "TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT"},
+ {REPORT_XMIN_RESULT, "REPORT_XMIN_RESULT"},
{RESULT_TYPE_COUNT, "RESULT_TYPE_COUNT"},
{-1, NULL}
};
diff --git a/src/gtm/main/Makefile b/src/gtm/main/Makefile
index 3e76afa50f..208b2297c7 100644
--- a/src/gtm/main/Makefile
+++ b/src/gtm/main/Makefile
@@ -15,7 +15,7 @@ ifneq ($(PORTNAME), win32)
override CFLAGS += $(PTHREAD_CFLAGS)
endif
-OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o gtm_standby.o gtm_opt.o gtm_backup.o
+OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_standby.o gtm_opt.o gtm_backup.o
OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a ../../port/libpgport.a
diff --git a/src/gtm/main/gtm_snap.c b/src/gtm/main/gtm_snap.c
index 9a9a78bfd0..f4ab3a127b 100644
--- a/src/gtm/main/gtm_snap.c
+++ b/src/gtm/main/gtm_snap.c
@@ -184,12 +184,10 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
if (GlobalTransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
- GTMTransactions.gt_recent_global_xmin = globalxmin;
snapshot->sn_xmin = xmin;
snapshot->sn_xmax = xmax;
snapshot->sn_xcnt = count;
- snapshot->sn_recent_global_xmin = globalxmin;
/*
* Now, before the proc array lock is released, set the xmin in the txninfo
@@ -235,7 +233,6 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
mysnap->sn_xmin = snapshot->sn_xmin;
mysnap->sn_xmax = snapshot->sn_xmax;
mysnap->sn_xcnt = snapshot->sn_xcnt;
- mysnap->sn_recent_global_xmin = snapshot->sn_recent_global_xmin;
memcpy(mysnap->sn_xip, snapshot->sn_xip,
sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
}
@@ -257,7 +254,6 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
mysnap->sn_xmin = snapshot->sn_xmin;
mysnap->sn_xmax = snapshot->sn_xmax;
mysnap->sn_xcnt = snapshot->sn_xcnt;
- mysnap->sn_recent_global_xmin = snapshot->sn_recent_global_xmin;
memcpy(mysnap->sn_xip, snapshot->sn_xip,
sizeof (GlobalTransactionId) * snapshot->sn_xcnt);
}
@@ -269,9 +265,9 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s
GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
- elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u:%u)",
+ elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)",
snapshot->sn_xmin, snapshot->sn_xmax,
- snapshot->sn_xcnt, snapshot->sn_recent_global_xmin);
+ snapshot->sn_xcnt);
return snapshot;
}
@@ -338,7 +334,6 @@ ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid)
pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count);
pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));
- pq_sendbytes(&buf, (char *)&snapshot->sn_recent_global_xmin, sizeof (GlobalTransactionId));
pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int));
pq_sendbytes(&buf, (char *)snapshot->sn_xip,
sizeof(GlobalTransactionId) * snapshot->sn_xcnt);
@@ -404,7 +399,6 @@ ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message)
pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId));
- pq_sendbytes(&buf, (char *)&snapshot->sn_recent_global_xmin, sizeof (GlobalTransactionId));
pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int));
pq_sendbytes(&buf, (char *)snapshot->sn_xip,
sizeof(GlobalTransactionId) * snapshot->sn_xcnt);
diff --git a/src/gtm/main/gtm_standby.c b/src/gtm/main/gtm_standby.c
index 6b30fd77bd..0455455323 100644
--- a/src/gtm/main/gtm_standby.c
+++ b/src/gtm/main/gtm_standby.c
@@ -164,8 +164,6 @@ gtm_standby_restore_gxid(void)
txn.gt_transactions_array[i].gti_current_snapshot.sn_xmin;
GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xmax =
txn.gt_transactions_array[i].gti_current_snapshot.sn_xmax;
- GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_recent_global_xmin =
- txn.gt_transactions_array[i].gti_current_snapshot.sn_recent_global_xmin;
GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xcnt =
txn.gt_transactions_array[i].gti_current_snapshot.sn_xcnt;
GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xip =
@@ -225,6 +223,7 @@ gtm_standby_restore_node(void)
data[i].type, data[i].nodename, data[i].datafolder);
if (Recovery_PGXCNodeRegister(data[i].type, data[i].nodename, data[i].port,
data[i].proxyname, data[i].status,
+ &data[i].reported_xmin,
data[i].ipaddress, data[i].datafolder, true,
-1 /* dummy socket */) != 0)
{
@@ -265,7 +264,8 @@ gtm_standby_register_self(const char *node_name, int port, const char *datadir)
standbyDataDir= (char *)datadir;
rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,
- standbyNodeName, standbyDataDir, NODE_DISCONNECTED);
+ standbyNodeName, standbyDataDir,
+ NODE_DISCONNECTED, InvalidGlobalTransactionId);
if (rc < 0)
{
elog(DEBUG1, "Failed to register a standby-GTM status.");
@@ -297,7 +297,8 @@ gtm_standby_activate_self(void)
}
rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,
- standbyNodeName, standbyDataDir, NODE_CONNECTED);
+ standbyNodeName, standbyDataDir,
+ NODE_CONNECTED, InvalidGlobalTransactionId);
if (rc < 0)
{
diff --git a/src/gtm/main/gtm_time.c b/src/gtm/main/gtm_time.c
deleted file mode 100644
index b2c252446c..0000000000
--- a/src/gtm/main/gtm_time.c
+++ /dev/null
@@ -1,41 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * gtm_time.c
- * Timestamp handling on GTM
- *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
- *
- *
- * IDENTIFICATION
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-
-#include "gtm/gtm.h"
-#include "gtm/gtm_c.h"
-#include "gtm/gtm_time.h"
-#include <time.h>
-#include <sys/time.h>
-
-GTM_Timestamp
-GTM_TimestampGetCurrent(void)
-{
- struct timeval tp;
- GTM_Timestamp result;
-
- gettimeofday(&tp, NULL);
-
- result = (GTM_Timestamp) tp.tv_sec -
- ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
-#ifdef HAVE_INT64_TIMESTAMP
- result = (result * USECS_PER_SEC) + tp.tv_usec;
-#else
- result = result + (tp.tv_usec / 1000000.0);
-#endif
-
- return result;
-}
diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c
index 249c74d189..7514f95359 100644
--- a/src/gtm/main/gtm_txn.c
+++ b/src/gtm/main/gtm_txn.c
@@ -477,70 +477,6 @@ GlobalTransactionIdDidAbort(GlobalTransactionId transactionId)
return false;
}
-/*
- * GlobalTransactionIdPrecedes --- is id1 logically < id2?
- */
-bool
-GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2)
-{
- /*
- * If either ID is a permanent XID then we can just do unsigned
- * comparison. If both are normal, do a modulo-2^31 comparison.
- */
- int32 diff;
-
- if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
- return (id1 < id2);
-
- diff = (int32) (id1 - id2);
- return (diff < 0);
-}
-
-/*
- * GlobalTransactionIdPrecedesOrEquals --- is id1 logically <= id2?
- */
-bool
-GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2)
-{
- int32 diff;
-
- if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
- return (id1 <= id2);
-
- diff = (int32) (id1 - id2);
- return (diff <= 0);
-}
-
-/*
- * GlobalTransactionIdFollows --- is id1 logically > id2?
- */
-bool
-GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2)
-{
- int32 diff;
-
- if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
- return (id1 > id2);
-
- diff = (int32) (id1 - id2);
- return (diff > 0);
-}
-
-/*
- * GlobalTransactionIdFollowsOrEquals --- is id1 logically >= id2?
- */
-bool
-GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2)
-{
- int32 diff;
-
- if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2))
- return (id1 >= id2);
-
- diff = (int32) (id1 - id2);
- return (diff >= 0);
-}
-
/*
* Set that the transaction is doing vacuum
@@ -2684,6 +2620,60 @@ ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message)
return;
}
+void
+ProcessReportXminCommand(Port *myport, StringInfo message, bool is_backup)
+{
+ StringInfoData buf;
+ GlobalTransactionId gxid;
+ GTM_StrLen nodelen;
+ char node_name[NI_MAXHOST];
+ GTM_PGXCNodeType type;
+ GlobalTransactionId global_xmin;
+ int errcode;
+ bool remoteIdle;
+
+ const char *data = pq_getmsgbytes(message, sizeof (gxid));
+
+ if (data == NULL)
+ ereport(ERROR,
+ (EPROTO,
+ errmsg("Message does not contain valid GXID")));
+ memcpy(&gxid, data, sizeof (gxid));
+
+ /* Read number of running transactions */
+ remoteIdle = pq_getmsgbyte(message);
+
+ /* Read Node Type */
+ type = pq_getmsgint(message, sizeof (GTM_PGXCNodeType));
+
+ /* get node name */
+ nodelen = pq_getmsgint(message, sizeof (GTM_StrLen));
+ memcpy(node_name, (char *)pq_getmsgbytes(message, nodelen), nodelen);
+ node_name[nodelen] = '\0';
+ pq_getmsgend(message);
+
+ global_xmin = GTM_HandleGlobalXmin(type, node_name, &gxid, remoteIdle,
+ &errcode);
+
+ {
+ /*
+ * Send a SUCCESS message back to the client
+ */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, REPORT_XMIN_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId));
+ pq_sendbytes(&buf, (char *)&global_xmin, sizeof (GlobalTransactionId));
+ pq_sendbytes(&buf, (char *)&errcode, sizeof (errcode));
+ pq_endmessage(myport, &buf);
+ pq_flush(myport);
+ }
+}
/*
* Mark GTM as shutting down. This point onwards no new GXID are issued to
@@ -2754,6 +2744,12 @@ void GTM_WriteRestorePointXid(FILE *f)
fprintf(f, "%u\n", GTMTransactions.gt_backedUpXid);
}
+GlobalTransactionId
+GTM_GetLatestCompletedXID(void)
+{
+ return GTMTransactions.gt_latestCompletedXid;
+}
+
/*
* TODO
*/
diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c
index a995fe9beb..0f3d89f08c 100644
--- a/src/gtm/main/main.c
+++ b/src/gtm/main/main.c
@@ -233,7 +233,7 @@ BaseInit()
GTM_InitTxnManager();
GTM_InitSeqManager();
-
+ GTM_InitNodeManager();
}
static void
@@ -1337,6 +1337,10 @@ ProcessCommand(Port *myport, StringInfo input_message)
case MSG_TXN_GET_GID_DATA:
case MSG_TXN_GET_NEXT_GXID:
case MSG_TXN_GXID_LIST:
+#ifdef XCP
+ case MSG_REPORT_XMIN:
+ case MSG_BKUP_REPORT_XMIN:
+#endif
ProcessTransactionCommand(myport, mtype, input_message);
break;
@@ -1667,6 +1671,14 @@ ProcessTransactionCommand(Port *myport, GTM_MessageType mtype, StringInfo messag
ProcessGXIDListCommand(myport, message);
break;
+ case MSG_REPORT_XMIN:
+ ProcessReportXminCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_REPORT_XMIN:
+ ProcessReportXminCommand(myport, message, true);
+ break;
+
default:
Assert(0); /* Shouldn't come here.. keep compiler quite */
}
diff --git a/src/gtm/proxy/Makefile b/src/gtm/proxy/Makefile
index c1ab2018d8..fe350113a9 100644
--- a/src/gtm/proxy/Makefile
+++ b/src/gtm/proxy/Makefile
@@ -17,7 +17,7 @@ endif
OBJS=proxy_main.o proxy_thread.o proxy_utils.o gtm_proxy_opt.o
-OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a
+OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../client/libgtmclient.a ../common/libgtm.a
LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq
diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c
index 041c5be392..9765a29e3b 100644
--- a/src/gtm/proxy/proxy_main.c
+++ b/src/gtm/proxy/proxy_main.c
@@ -155,8 +155,6 @@ static void ProcessCommand(GTMProxy_ConnectionInfo *conninfo,
GTM_Conn *gtm_conn, StringInfo input_message);
static GTM_Conn *HandleGTMError(GTM_Conn *gtm_conn);
static GTM_Conn *HandlePostCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn);
-static void ProcessPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo,
- GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
static void ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo,
GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
static void ProcessSnapshotCommand(GTMProxy_ConnectionInfo *conninfo,
@@ -274,9 +272,6 @@ BaseInit()
GTM_RWLockInit(&ReconnectControlLock);
- /* Save Node Register File in register.c */
- Recovery_SaveRegisterFileName(GTMProxyDataDir);
-
/* Register Proxy on GTM */
RegisterProxy(false);
@@ -524,9 +519,6 @@ GTMProxy_SigleHandler(int signal)
/* Unregister Proxy on GTM */
UnregisterProxy();
- /* Rewrite Register Information (clean up unregister records) */
- Recovery_SaveRegisterInfo();
-
/*
* XXX We should do a clean shutdown here.
*/
@@ -1595,13 +1587,10 @@ ProcessCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
case MSG_BARRIER:
case MSG_TXN_COMMIT:
case MSG_REGISTER_SESSION:
- GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, input_message);
- break;
-
-
+ case MSG_REPORT_XMIN:
case MSG_NODE_REGISTER:
case MSG_NODE_UNREGISTER:
- ProcessPGXCNodeCommand(conninfo, gtm_conn, mtype, input_message);
+ GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, input_message);
break;
case MSG_TXN_BEGIN:
@@ -1740,6 +1729,7 @@ IsProxiedMessage(GTM_MessageType mtype)
case MSG_NODE_REGISTER:
case MSG_NODE_UNREGISTER:
case MSG_REGISTER_SESSION:
+ case MSG_REPORT_XMIN:
case MSG_SNAPSHOT_GXID_GET:
case MSG_SEQUENCE_INIT:
case MSG_SEQUENCE_GET_CURRENT:
@@ -1916,7 +1906,6 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
pq_sendbytes(&buf, (char *)&status, sizeof (status));
pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_xmin, sizeof (GlobalTransactionId));
pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_xmax, sizeof (GlobalTransactionId));
- pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_recent_global_xmin, sizeof (GlobalTransactionId));
pq_sendint(&buf, res->gr_snapshot.sn_xcnt, sizeof (int));
pq_sendbytes(&buf, (char *)res->gr_snapshot.sn_xip,
sizeof(GlobalTransactionId) * res->gr_snapshot.sn_xcnt);
@@ -1943,6 +1932,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
case MSG_NODE_REGISTER:
case MSG_NODE_UNREGISTER:
case MSG_REGISTER_SESSION:
+ case MSG_REPORT_XMIN:
case MSG_SNAPSHOT_GXID_GET:
case MSG_SEQUENCE_INIT:
case MSG_SEQUENCE_GET_CURRENT:
@@ -2122,151 +2112,6 @@ ReadCommand(GTMProxy_ConnectionInfo *conninfo, StringInfo inBuf)
}
static void
-ProcessPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
- GTM_MessageType mtype, StringInfo message)
-{
- GTMProxy_CommandData cmd_data;
-
- /*
- * For Node registering, proxy number is also saved and registered on GTM with node.
- * So get and modify the register message in consequence.
- */
- switch(mtype)
- {
- case MSG_NODE_REGISTER:
- {
- int len;
- MemoryContext oldContext;
- char remote_host[NI_MAXHOST];
- char remote_port[NI_MAXSERV];
-
- /* Get Remote IP and port from Conn structure to register */
- remote_host[0] = '\0';
- remote_port[0] = '\0';
-
- if (gtm_getnameinfo_all(&conninfo->con_port->raddr.addr,
- conninfo->con_port->raddr.salen,
- remote_host, sizeof(remote_host),
- remote_port, sizeof(remote_port),
- NI_NUMERICSERV))
- {
- int ret = gtm_getnameinfo_all(&conninfo->con_port->raddr.addr,
- conninfo->con_port->raddr.salen,
- remote_host, sizeof(remote_host),
- remote_port, sizeof(remote_port),
- NI_NUMERICHOST | NI_NUMERICSERV);
-
- if (ret)
- ereport(WARNING,
- (errmsg_internal("gtm_getnameinfo_all() failed")));
- }
-
- /* Get the node type */
- memcpy(&cmd_data.cd_reg.type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), sizeof (GTM_PGXCNodeType));
-
- /* Then obtain the node name */
- len = pq_getmsgint(message, sizeof(GTM_StrLen));
- cmd_data.cd_reg.nodename = palloc(len + 1);
- memcpy(cmd_data.cd_reg.nodename, (char *)pq_getmsgbytes(message, len), len);
- cmd_data.cd_reg.nodename[len] = '\0';
-
- /*
- * Now we have to waste the following host information. It is taken from
- * the address field in the conn.
- */
- len = pq_getmsgint(message, sizeof(GTM_StrLen));
- cmd_data.cd_reg.ipaddress = palloc(len + 1);
- memcpy(cmd_data.cd_reg.ipaddress, (char *)pq_getmsgbytes(message, len), len);
- cmd_data.cd_reg.ipaddress[len] = '\0';
-
- /* Then the next is the port number */
- memcpy(&cmd_data.cd_reg.port,
- pq_getmsgbytes(message,
- sizeof (GTM_PGXCNodePort)),
- sizeof (GTM_PGXCNodePort));
-
- /* Proxy name */
- len = pq_getmsgint(message, sizeof(GTM_StrLen));
- cmd_data.cd_reg.gtm_proxy_nodename = palloc(len + 1);
- memcpy(cmd_data.cd_reg.gtm_proxy_nodename, (char *)pq_getmsgbytes(message, len), len);
- cmd_data.cd_reg.gtm_proxy_nodename[len] = '\0';
-
- /* get data folder data */
- len = pq_getmsgint(message, sizeof (int));
- cmd_data.cd_reg.datafolder = palloc(len + 1);
- memcpy(cmd_data.cd_reg.datafolder, (char *)pq_getmsgbytes(message, len), len);
- cmd_data.cd_reg.datafolder[len] = '\0';
-
- /* Now we have one more data to waste, "status" */
- cmd_data.cd_reg.status = pq_getmsgint(message, sizeof(GTM_PGXCNodeStatus));
- pq_getmsgend(message);
-
- /* Copy also remote host address in data to be proxied */
- cmd_data.cd_reg.ipaddress = (char *) palloc(strlen(remote_host) + 1);
- memcpy(cmd_data.cd_reg.ipaddress, remote_host, strlen(remote_host));
- cmd_data.cd_reg.ipaddress[strlen(remote_host)] = '\0';
-
- /* Registering has to be saved where it can be seen by all the threads */
- oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
-
- /* Register Node also on Proxy */
- if (Recovery_PGXCNodeRegister(cmd_data.cd_reg.type,
- cmd_data.cd_reg.nodename,
- cmd_data.cd_reg.port,
- GTMProxyNodeName,
- NODE_CONNECTED,
- remote_host,
- cmd_data.cd_reg.datafolder,
- false,
- conninfo->con_port->sock))
- {
- ereport(ERROR,
- (EINVAL,
- errmsg("Failed to Register node")));
- }
-
- MemoryContextSwitchTo(oldContext);
-
- GTMProxy_ProxyPGXCNodeCommand(conninfo, gtm_conn, mtype, cmd_data);
- break;
- }
- case MSG_NODE_UNREGISTER:
- {
- int len;
- MemoryContext oldContext;
- char *nodename;
-
- memcpy(&cmd_data.cd_reg.type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), sizeof (GTM_PGXCNodeType));
- len = pq_getmsgint(message, sizeof(GTM_StrLen));
- nodename = palloc(len + 1);
- memcpy(nodename, pq_getmsgbytes(message, len), len);
- nodename[len] = '\0'; /* Need null-terminate */
- cmd_data.cd_reg.nodename = nodename;
- pq_getmsgend(message);
-
- /* Unregistering has to be saved in a place where it can be seen by all the threads */
- oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
-
- /*
- * Unregister node. Ignore any error here, otherwise we enter
- * endless loop trying to execute command again and again
- */
- Recovery_PGXCNodeUnregister(cmd_data.cd_reg.type,
- cmd_data.cd_reg.nodename,
- false,
- conninfo->con_port->sock);
- MemoryContextSwitchTo(oldContext);
-
- GTMProxy_ProxyPGXCNodeCommand(conninfo, gtm_conn, mtype, cmd_data);
- break;
- }
- default:
- Assert(0); /* Shouldn't come here.. Keep compiler quiet */
- }
- return;
-}
-
-static void
ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
GTM_MessageType mtype, StringInfo message)
{
@@ -2394,86 +2239,6 @@ GTMProxy_ProxyCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
/*
- * Proxy the incoming message to the GTM server after adding our own identifier
- * to it. Add also in the registration message the GTM Proxy number and rebuilt message.
- */
-static void GTMProxy_ProxyPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo,GTM_Conn *gtm_conn, GTM_MessageType mtype, GTMProxy_CommandData cmd_data)
-{
- GTMProxy_CommandInfo *cmdinfo;
- GTMProxy_ThreadInfo *thrinfo = GetMyThreadInfo;
- GTM_ProxyMsgHeader proxyhdr;
-
- proxyhdr.ph_conid = conninfo->con_id;
-
- switch(mtype)
- {
- case MSG_NODE_REGISTER:
- /* Rebuild the message */
- if (gtmpqPutMsgStart('C', true, gtm_conn) ||
- /* GTM Proxy Header */
- gtmpqPutnchar((char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader), gtm_conn) ||
- /* Message Type */
- gtmpqPutInt(MSG_NODE_REGISTER, sizeof (GTM_MessageType), gtm_conn) ||
- /* Node Type to Register */
- gtmpqPutnchar((char *)&cmd_data.cd_reg.type, sizeof(GTM_PGXCNodeType), gtm_conn) ||
- /* Node Name (length) */
- gtmpqPutInt(strlen(cmd_data.cd_reg.nodename), sizeof (GTM_StrLen), gtm_conn) ||
- /* Node Name (var-len) */
- gtmpqPutnchar(cmd_data.cd_reg.nodename, strlen(cmd_data.cd_reg.nodename), gtm_conn) ||
- /* Host Name (length) */
- gtmpqPutInt(strlen(cmd_data.cd_reg.ipaddress), sizeof (GTM_StrLen), gtm_conn) ||
- /* Host Name (var-len) */
- gtmpqPutnchar(cmd_data.cd_reg.ipaddress, strlen(cmd_data.cd_reg.ipaddress), gtm_conn) ||
- /* Port Number */
- gtmpqPutnchar((char *)&cmd_data.cd_reg.port, sizeof(GTM_PGXCNodePort), gtm_conn) ||
- /* Proxy Name (empty string if connected to GTM directly) */
- gtmpqPutInt(strlen(cmd_data.cd_reg.gtm_proxy_nodename), 4, gtm_conn) ||
- /* Proxy Name name (var-len) */
- gtmpqPutnchar(cmd_data.cd_reg.gtm_proxy_nodename, strlen(cmd_data.cd_reg.gtm_proxy_nodename), gtm_conn) ||
- /* Data Folder length */
- gtmpqPutInt(strlen(cmd_data.cd_reg.datafolder), 4, gtm_conn) ||
- /* Data folder name (var-len) */
- gtmpqPutnchar(cmd_data.cd_reg.datafolder, strlen(cmd_data.cd_reg.datafolder), gtm_conn) ||
- /* Node Status */
- gtmpqPutInt(cmd_data.cd_reg.status, sizeof(GTM_PGXCNodeStatus), gtm_conn))
-
- elog(ERROR, "Error proxing data");
- break;
-
- case MSG_NODE_UNREGISTER:
- if (gtmpqPutMsgStart('C', true, gtm_conn) ||
- gtmpqPutnchar((char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader), gtm_conn) ||
- gtmpqPutInt(MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), gtm_conn) ||
- gtmpqPutnchar((char *)&cmd_data.cd_reg.type, sizeof(GTM_PGXCNodeType), gtm_conn) ||
- /* Node Name (length) */
- gtmpqPutInt(strlen(cmd_data.cd_reg.nodename), sizeof (GTM_StrLen), gtm_conn) ||
- /* Node Name (var-len) */
- gtmpqPutnchar(cmd_data.cd_reg.nodename, strlen(cmd_data.cd_reg.nodename), gtm_conn))
- elog(ERROR, "Error proxing data");
- break;
-
- default:
- Assert(0); /* Shouldn't come here.. Keep compiler quiet */
- }
-
- /*
- * Add the message to the pending command list
- */
- cmdinfo = palloc0(sizeof (GTMProxy_CommandInfo));
- cmdinfo->ci_mtype = mtype;
- cmdinfo->ci_conn = conninfo;
- cmdinfo->ci_res_index = 0;
- thrinfo->thr_processed_commands = gtm_lappend(thrinfo->thr_processed_commands, cmdinfo);
-
- /* Finish the message. */
- if (gtmpqPutMsgEnd(gtm_conn))
- elog(ERROR, "Error finishing the message");
-
- return;
-}
-
-
-/*
* Record the incoming message as per its type. After all messages of this type
* are collected, they will be sent in a single message to the GTM server.
*/
@@ -2576,9 +2341,6 @@ GTMProxy_HandleDisconnect(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn)
GTM_ProxyMsgHeader proxyhdr;
int namelen;
- /* Mark node as disconnected if it is a postmaster backend */
- Recovery_PGXCNodeDisconnect(conninfo->con_port);
-
proxyhdr.ph_conid = conninfo->con_id;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, gtm_conn) ||
@@ -3200,6 +2962,7 @@ RegisterProxy(bool is_reconnect)
char proxyname[] = "";
time_t finish_time;
MemoryContext old_mcxt = NULL;
+ GlobalTransactionId xmin = InvalidGlobalTransactionId;
if (is_reconnect)
{
@@ -3240,7 +3003,8 @@ RegisterProxy(bool is_reconnect)
gtmpqPutnchar(proxyname, (int)strlen(proxyname), master_conn) ||
gtmpqPutInt((int)strlen(GTMProxyDataDir), 4, master_conn) ||
gtmpqPutnchar(GTMProxyDataDir, strlen(GTMProxyDataDir), master_conn)||
- gtmpqPutInt(NODE_CONNECTED, sizeof(GTM_PGXCNodeStatus), master_conn))
+ gtmpqPutInt(NODE_CONNECTED, sizeof(GTM_PGXCNodeStatus), master_conn) ||
+ gtmpqPutnchar((char *)&xmin, sizeof (GlobalTransactionId), master_conn))
goto failed;
/* Finish the message. */
diff --git a/src/gtm/recovery/register_common.c b/src/gtm/recovery/register_common.c
index 50c0ee3538..1cbdc2e104 100644
--- a/src/gtm/recovery/register_common.c
+++ b/src/gtm/recovery/register_common.c
@@ -29,6 +29,8 @@
#include "gtm/gtm_client.h"
#include "gtm/gtm_serialize.h"
#include "gtm/gtm_standby.h"
+#include "gtm/gtm_time.h"
+#include "gtm/gtm_txn.h"
#include "gtm/libpq.h"
#include "gtm/libpq-int.h"
#include "gtm/pqformat.h"
@@ -45,7 +47,6 @@
typedef struct GTM_NodeInfoHashBucket
{
gtm_List *nhb_list;
- GTM_RWLock nhb_lock;
} GTM_PGXCNodeInfoHashBucket;
static char GTMPGXCNodeFile[GTM_NODE_FILE_MAX_PATH];
@@ -53,11 +54,16 @@ static char GTMPGXCNodeFile[GTM_NODE_FILE_MAX_PATH];
/* Lock access of record file when necessary */
static GTM_RWLock RegisterFileLock;
+/* Lock to control registration/unregistration of nodes */
+static GTM_RWLock PGXCNodesLock;
+
static int NodeRegisterMagic = 0xeaeaeaea;
static int NodeUnregisterMagic = 0xebebebeb;
static int NodeEndMagic = 0xefefefef;
static GTM_PGXCNodeInfoHashBucket GTM_PGXCNodes[NODE_HASH_TABLE_SIZE];
+static GlobalTransactionId GTM_GlobalXmin = FirstNormalGlobalTransactionId;
+static GTM_Timestamp GTM_GlobalXminComputedTime;
static GTM_PGXCNodeInfo *pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name);
static uint32 pgxcnode_gethash(char *nodename);
@@ -76,12 +82,11 @@ pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen)
int node = 0;
int i;
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
+
for (i = 0; i < NODE_HASH_TABLE_SIZE; i++)
{
bucket = &GTM_PGXCNodes[i];
-
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ);
-
gtm_foreach(elem, bucket->nhb_list)
{
GTM_PGXCNodeInfo *curr_nodeinfo = NULL;
@@ -96,9 +101,8 @@ pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen)
if (node == maxlen)
break;
}
-
- GTM_RWLockRelease(&bucket->nhb_lock);
}
+ GTM_RWLockRelease(&PGXCNodesLock);
return node;
}
@@ -111,12 +115,10 @@ pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t max
int node = 0;
int i;
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
for (i = 0; i < NODE_HASH_TABLE_SIZE; i++)
{
bucket = &GTM_PGXCNodes[i];
-
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ);
-
gtm_foreach(elem, bucket->nhb_list)
{
GTM_PGXCNodeInfo *cur = NULL;
@@ -133,9 +135,8 @@ pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t max
if (node == maxlen)
break;
}
-
- GTM_RWLockRelease(&bucket->nhb_lock);
}
+ GTM_RWLockRelease(&PGXCNodesLock);
return node;
}
@@ -153,8 +154,7 @@ pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name)
bucket = &GTM_PGXCNodes[hash];
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ);
-
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
gtm_foreach(elem, bucket->nhb_list)
{
curr_nodeinfo = (GTM_PGXCNodeInfo *) gtm_lfirst(elem);
@@ -164,7 +164,7 @@ pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name)
curr_nodeinfo = NULL;
}
- GTM_RWLockRelease(&bucket->nhb_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
return curr_nodeinfo;
}
@@ -213,13 +213,13 @@ pgxcnode_remove_info(GTM_PGXCNodeInfo *nodeinfo)
bucket = &GTM_PGXCNodes[hash];
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_WRITE);
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE);
GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_WRITE);
bucket->nhb_list = gtm_list_delete(bucket->nhb_list, nodeinfo);
GTM_RWLockRelease(&nodeinfo->node_lock);
- GTM_RWLockRelease(&bucket->nhb_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
return 0;
}
@@ -236,7 +236,12 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo)
bucket = &GTM_PGXCNodes[hash];
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_WRITE);
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE);
+
+ elog(LOG, "Nodeinfo->reported_xmin - %d:%d", nodeinfo->reported_xmin,
+ GTM_GlobalXmin);
+ if (!GlobalTransactionIdIsValid(nodeinfo->reported_xmin))
+ nodeinfo->reported_xmin = GTM_GlobalXmin;
gtm_foreach(elem, bucket->nhb_list)
{
@@ -249,7 +254,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo)
{
if (curr_nodeinfo->status == NODE_CONNECTED)
{
- GTM_RWLockRelease(&bucket->nhb_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
ereport(LOG,
(EEXIST,
errmsg("Node with the given ID number already exists - %s %d:%d",
@@ -296,7 +301,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo)
/* Set socket number with the new one */
curr_nodeinfo->socket = nodeinfo->socket;
- GTM_RWLockRelease(&bucket->nhb_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
return 0;
}
}
@@ -306,7 +311,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo)
* Safe to add the structure to the list
*/
bucket->nhb_list = gtm_lappend(bucket->nhb_list, nodeinfo);
- GTM_RWLockRelease(&bucket->nhb_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
return 0;
}
@@ -380,6 +385,7 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type,
GTM_PGXCNodePort port,
char *proxyname,
GTM_PGXCNodeStatus status,
+ GlobalTransactionId *xmin,
char *ipaddress,
char *datafolder,
bool in_recovery,
@@ -408,6 +414,8 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type,
nodeinfo->ipaddress = pgxcnode_copy_char(ipaddress);
nodeinfo->status = status;
nodeinfo->socket = socket;
+ nodeinfo->reported_xmin = *xmin;
+ nodeinfo->reported_xmin_time = GTM_TimestampGetCurrent();
elog(DEBUG1, "Recovery_PGXCNodeRegister Request info: type=%d, nodename=%s, port=%d," \
"datafolder=%s, ipaddress=%s, status=%d",
@@ -427,6 +435,9 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type,
if (!in_recovery && errcode == 0)
Recovery_RecordRegisterInfo(nodeinfo, true);
+ if (xmin)
+ *xmin = nodeinfo->reported_xmin;
+
return errcode;
}
@@ -460,12 +471,11 @@ Recovery_SaveRegisterInfo(void)
return;
}
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
for (hash = 0; hash < NODE_HASH_TABLE_SIZE; hash++)
{
bucket = &GTM_PGXCNodes[hash];
- GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ);
-
/* Write one by one information about registered nodes */
gtm_foreach(elem, bucket->nhb_list)
{
@@ -536,9 +546,8 @@ Recovery_SaveRegisterInfo(void)
GTM_RWLockRelease(&nodeinfo->node_lock);
}
-
- GTM_RWLockRelease(&bucket->nhb_lock);
}
+ GTM_RWLockRelease(&PGXCNodesLock);
close(ctlfd);
@@ -745,11 +754,20 @@ Recovery_PGXCNodeRegisterCoordProcess(char *coord_node, int coord_procid,
*/
nodeinfo = pgxcnode_find_info(GTM_NODE_COORDINATOR, coord_node);
- if (nodeinfo == NULL)
+ while (nodeinfo == NULL)
{
- if (Recovery_PGXCNodeRegister(GTM_NODE_COORDINATOR, coord_node, 0, NULL,
- NODE_CONNECTED, NULL, NULL, false, 0))
- return 0;
+ GlobalTransactionId xmin = InvalidGlobalTransactionId;
+ int errcode = Recovery_PGXCNodeRegister(GTM_NODE_COORDINATOR, coord_node, 0, NULL,
+ NODE_CONNECTED,
+ &xmin,
+ NULL, NULL, false, 0);
+
+ /*
+ * If another thread registers before we get a chance, just look for
+ * the nodeinfo again
+ */
+ if (errcode != 0 && errcode != EEXIST)
+ return 0;
nodeinfo = pgxcnode_find_info(GTM_NODE_COORDINATOR, coord_node);
}
@@ -887,3 +905,320 @@ retry:
elog(DEBUG1, "MSG_BACKEND_DISCONNECT rc=%d done.", _rc);
}
}
+
+void
+GTM_InitNodeManager(void)
+{
+ int ii;
+
+ for (ii = 0; ii < NODE_HASH_TABLE_SIZE; ii++)
+ {
+ GTM_PGXCNodes[ii].nhb_list = gtm_NIL;
+ }
+
+ GTM_RWLockInit(&RegisterFileLock);
+ GTM_RWLockInit(&PGXCNodesLock);
+}
+
+/*
+ * Set to 120 seconds, but should be a few multiple for cluster monitor naptime
+ */
+#define GTM_REPORT_XMIN_DELAY_THRESHOLD (120 * 1000)
+
+GlobalTransactionId
+GTM_HandleGlobalXmin(GTM_PGXCNodeType type, char *node_name,
+ GlobalTransactionId *reported_xmin, bool remoteIdle, int *errcode)
+{
+ GTM_PGXCNodeInfo *all_nodes[MAX_NODES];
+ int num_nodes;
+ GTM_Timestamp current_time;
+ GTM_PGXCNodeInfo *mynodeinfo;
+ int ii;
+ GlobalTransactionId global_xmin;
+ GlobalTransactionId non_idle_global_xmin;
+ GlobalTransactionId idle_global_xmin;
+ bool excludeSelf = false;
+
+ *errcode = 0;
+
+ elog(LOG, "node_name: %s, remoteIdle: %d, reported_xmin: %d, global_xmin: %d",
+ node_name, remoteIdle, *reported_xmin,
+ GTM_GlobalXmin);
+
+ /*
+ * Hold the PGXCNodesLock in READ mode until we are done with the
+ * GlobalXmin calculation. We don't want any new node to join the cluster,
+ * but its OK for other nodes to report and do these computation in
+ * parallel. If the other guy beats us and advances the GlobalXmin beyond
+ * what we compute, we accept that calculation
+ */
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ);
+
+ mynodeinfo = pgxcnode_find_info(type, node_name);
+ if (mynodeinfo == NULL)
+ {
+ *errcode = GTM_ERRCODE_NODE_NOT_REGISTERED;
+ GTM_RWLockRelease(&PGXCNodesLock);
+ elog(LOG, "GTM_ERRCODE_NODE_NOT_REGISTERED - node_name %s", node_name);
+ return InvalidGlobalTransactionId;
+ }
+
+ GTM_RWLockAcquire(&mynodeinfo->node_lock, GTM_LOCKMODE_WRITE);
+
+ /*
+ * If we were excluded from the GlobalXmin calculation because we failed to
+ * report our status for GTM_REPORT_XMIN_DELAY_THRESHOLD seconds, we can
+ * only join the cluster back iff the GlobalXmin hasn't advanced beyond
+ * what we'd last reported. Otherwise its possible that some nodes are way
+ * ahead of us. So we must give up and restart all over again (this is done
+ * via PANIC in Cluster Monitor process on the remote side
+ *
+ * The GTM_REPORT_XMIN_DELAY_THRESHOLD is of many order higher than the
+ * naptime used by Cluster Monitor. So unless there was a network outage or
+ * the remote node got serious busy such that the Cluster Monitor did not
+ * get opportunity to report xmin in a timely fashion, we shouldn't get
+ * into this situation often.
+ *
+ * The exception to this rule is that if the remote node is idle, then we
+ * actually ignore the xmin reported by it and instead calculate a new xmin
+ * for it and send it back in respone. The remote node will still done
+ * final sanity check and either accept that xmin or kill itself via PANIC
+ * mechanism.
+ */
+ if ((mynodeinfo->excluded) &&
+ GlobalTransactionIdPrecedes(mynodeinfo->reported_xmin,
+ GTM_GlobalXmin) && !remoteIdle)
+ {
+ *errcode = GTM_ERRCODE_NODE_EXCLUDED;
+ GTM_RWLockRelease(&mynodeinfo->node_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
+ elog(LOG, "GTM_ERRCODE_NODE_EXCLUDED - node_name %s", node_name);
+ return InvalidGlobalTransactionId;
+ }
+
+ /*
+ * The remote node must not report a xmin which precedes the xmin it had
+ * reported in the past. If it ever happens, send an error back and let the
+ * remote node restart itself
+ */
+ if (!remoteIdle && GlobalTransactionIdPrecedes(*reported_xmin, mynodeinfo->reported_xmin))
+ {
+ *errcode = GTM_ERRCODE_TOO_OLD_XMIN;
+ GTM_RWLockRelease(&mynodeinfo->node_lock);
+ GTM_RWLockRelease(&PGXCNodesLock);
+ elog(LOG, "GTM_ERRCODE_TOO_OLD_XMIN - node_name %s", node_name);
+ return InvalidGlobalTransactionId;
+ }
+
+ elog(DEBUG1, "node_name: %s, remoteIdle: %d, reported_xmin: %d, nodeinfo->reported_xmin: %d",
+ mynodeinfo->nodename, remoteIdle, *reported_xmin,
+ mynodeinfo->reported_xmin);
+
+ /*
+ * If the remote node is idle, there is a danger that it may keep reporting
+ * a very old xmin (usually capped by latestCompletedXid). To handle such
+ * cases, which can be quite common in a large cluster, we check if the
+ * remote node has reported idle status and the reported xmin is same as
+ * what it reported in the last cycle and mark such node as "idle".
+ * Xmin reported by such a node is ignored and we compute xmin for it
+ * locally, here on the GTM.
+ *
+ * There are two strategies we follow:
+ *
+ * 1. We compute the lower bound of xmins reported by all non-idle remote
+ * nodes and assign that to this guy. This assumes that there is zero
+ * chance that a currently active (non-idle) node will send something to
+ * this guy which is older than the xmin computed
+ *
+ * 2. If all nodes are reporting their status as idle, we compute the lower
+ * bound of xmins reported by all idle nodes. This guarantees that the
+ * GlobalXmin can advance to a reasonable point even when all nodes have
+ * turned idle.
+ *
+ * In any case, the remote node will do its own sanity check before
+ * accepting the xmin computed by us and bail out if it doesn't agree with
+ * that.
+ */
+ if (remoteIdle &&
+ GlobalTransactionIdEquals(mynodeinfo->reported_xmin,
+ *reported_xmin))
+ mynodeinfo->idle = true;
+ else
+ {
+ mynodeinfo->idle = false;
+ mynodeinfo->reported_xmin = *reported_xmin;
+ }
+ mynodeinfo->excluded = false;
+ mynodeinfo->reported_xmin_time = current_time = GTM_TimestampGetCurrent();
+
+ GTM_RWLockRelease(&mynodeinfo->node_lock);
+
+ /* Compute both, idle as well as non-idle xmin */
+ non_idle_global_xmin = InvalidGlobalTransactionId;
+ idle_global_xmin = InvalidGlobalTransactionId;
+
+ num_nodes = pgxcnode_get_all(all_nodes, MAX_NODES);
+
+ elog(DEBUG1, "num_nodes - %d", num_nodes);
+
+ for (ii = 0; ii < num_nodes; ii++)
+ {
+ GlobalTransactionId xid;
+ GTM_PGXCNodeInfo *nodeinfo = all_nodes[ii];
+
+ elog(DEBUG1, "nodeinfo %p, type: %d, exclude %c, idle %c, xmin %d, time %lld",
+ nodeinfo, nodeinfo->type, nodeinfo->excluded ? 'T' : 'F',
+ nodeinfo->idle ? 'T' : 'F',
+ nodeinfo->reported_xmin, nodeinfo->reported_xmin_time);
+
+ if (nodeinfo->excluded)
+ continue;
+
+ if (nodeinfo->type != GTM_NODE_COORDINATOR && nodeinfo->type !=
+ GTM_NODE_DATANODE)
+ continue;
+
+ if (GTM_TimestampDifferenceExceeds(nodeinfo->reported_xmin_time,
+ current_time, GTM_REPORT_XMIN_DELAY_THRESHOLD))
+ {
+ elog(LOG, "Timediff exceeds threshold - %ld:%ld - excluding the "
+ "node from GlobalXmin calculation",
+ nodeinfo->reported_xmin_time, current_time);
+
+ GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_WRITE);
+ if (GTM_TimestampDifferenceExceeds(nodeinfo->reported_xmin_time,
+ current_time, GTM_REPORT_XMIN_DELAY_THRESHOLD))
+ {
+ nodeinfo->excluded = true;
+ GTM_RWLockRelease(&nodeinfo->node_lock);
+ continue;
+ }
+ GTM_RWLockRelease(&nodeinfo->node_lock);
+ }
+
+ /*
+ * If the remote node is idle, don't include its reported xmin in the
+ * calculation which could be quite stale
+ */
+ if (mynodeinfo->idle && (nodeinfo == mynodeinfo))
+ continue;
+
+ /*
+ * Now grab the lock on the nodeinfo so that no further changes are
+ * possible to its state.
+ */
+ GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_READ);
+
+ /*
+ * Just check again if the excluded state hasn't changed. Shouldn't
+ * happen too often anyways
+ */
+ if (nodeinfo->excluded)
+ {
+ GTM_RWLockRelease(&nodeinfo->node_lock);
+ continue;
+ }
+
+ xid = nodeinfo->reported_xmin;
+ if (!nodeinfo->idle)
+ {
+ if (!GlobalTransactionIdIsValid(non_idle_global_xmin))
+ non_idle_global_xmin = xid;
+ else if (GlobalTransactionIdPrecedes(xid, non_idle_global_xmin))
+ non_idle_global_xmin = xid;
+ }
+ else
+ {
+ if (!GlobalTransactionIdIsValid(idle_global_xmin))
+ idle_global_xmin = xid;
+ else if (GlobalTransactionIdPrecedes(xid, idle_global_xmin))
+ idle_global_xmin = xid;
+ }
+ GTM_RWLockRelease(&nodeinfo->node_lock);
+ }
+
+ /*
+ * If the remote node is idle, a new xmin might have been computed for it
+ * by us. We first try for non_idle_global_xmin, but if all nodes are idle,
+ * we use the idle_global_xmin
+ */
+ if (mynodeinfo && mynodeinfo->idle)
+ {
+ GTM_RWLockAcquire(&mynodeinfo->node_lock, GTM_LOCKMODE_WRITE);
+ if (GlobalTransactionIdIsValid(non_idle_global_xmin))
+ {
+ if (GlobalTransactionIdFollows(non_idle_global_xmin,
+ mynodeinfo->reported_xmin))
+ *reported_xmin = mynodeinfo->reported_xmin = non_idle_global_xmin;
+ }
+ else if (GlobalTransactionIdIsValid(idle_global_xmin))
+ {
+ if (GlobalTransactionIdFollows(non_idle_global_xmin,
+ mynodeinfo->reported_xmin))
+ *reported_xmin = mynodeinfo->reported_xmin = idle_global_xmin;
+ }
+ mynodeinfo->reported_xmin_time = current_time;
+ GTM_RWLockRelease(&mynodeinfo->node_lock);
+ }
+
+ /*
+ * Now all nodes that must be excluded from GlobalXmin computation have
+ * been marked correctly and xmin computed and set for an idle remote node,
+ * if so. Lets compute the GlobalXmin
+ */
+
+ /*
+ * GlobalXmin is capped by the latestCompletedXid. Since any future
+ * additions/changes can't cross this horizon, it seems appropriate to use
+ * this as upper bound for GlobalXmin computation
+ */
+ global_xmin = GTMTransactions.gt_latestCompletedXid;
+ if (!GlobalTransactionIdIsValid(global_xmin))
+ global_xmin = FirstNormalGlobalTransactionId;
+ else
+ GlobalTransactionIdAdvance(global_xmin);
+
+ for (ii = 0; ii < num_nodes; ii++)
+ {
+ GlobalTransactionId xid;
+ GTM_PGXCNodeInfo *nodeinfo = all_nodes[ii];
+
+ if (nodeinfo->excluded)
+ continue;
+
+ if (nodeinfo->type != GTM_NODE_COORDINATOR && nodeinfo->type !=
+ GTM_NODE_DATANODE)
+ continue;
+
+ /* Fetch once */
+ xid = nodeinfo->reported_xmin;
+ if (!GlobalTransactionIdIsValid(global_xmin))
+ global_xmin = xid;
+ else if (GlobalTransactionIdPrecedes(xid, global_xmin))
+ global_xmin = xid;
+ }
+ GTM_RWLockRelease(&PGXCNodesLock);
+
+ /*
+ * Now update the GTM_GlobalXmin and also record the time when its updated
+ * but iff someone else has not beaten us in the calculation already, which
+ * is possible because we did the calculation holding only a READ lock on
+ * PGXCNodesLock
+ */
+ if (GlobalTransactionIdIsValid(global_xmin))
+ {
+ GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE);
+ if (GlobalTransactionIdPrecedes(GTM_GlobalXmin, global_xmin))
+ {
+ GTM_GlobalXmin = global_xmin;
+ GTM_GlobalXminComputedTime = current_time;
+ }
+ else
+ global_xmin = GTM_GlobalXmin;
+ GTM_RWLockRelease(&PGXCNodesLock);
+ }
+
+
+ elog(DEBUG1, "GTM_HandleGlobalXmin - %d", global_xmin);
+ return global_xmin;
+}
diff --git a/src/gtm/recovery/register_gtm.c b/src/gtm/recovery/register_gtm.c
index 371b4c97f8..780dcd3b47 100644
--- a/src/gtm/recovery/register_gtm.c
+++ b/src/gtm/recovery/register_gtm.c
@@ -63,6 +63,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
int len;
StringInfoData buf;
GTM_PGXCNodeStatus status;
+ GlobalTransactionId xmin;
/* Read Node Type */
memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)),
@@ -107,10 +108,13 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
status = pq_getmsgint(message, sizeof (GTM_PGXCNodeStatus));
+ memcpy(&xmin, pq_getmsgbytes(message, sizeof (GlobalTransactionId)),
+ sizeof (GlobalTransactionId));
+
elog(DEBUG1,
"ProcessPGXCNodeRegister: ipaddress = \"%s\", node name = \"%s\", proxy name = \"%s\", "
- "datafolder \"%s\", status = %d",
- ipaddress, node_name, proxyname, datafolder, status);
+ "datafolder \"%s\", status = %d, xmin = %d",
+ ipaddress, node_name, proxyname, datafolder, status, xmin);
if ((type!=GTM_NODE_GTM_PROXY) &&
(type!=GTM_NODE_GTM_PROXY_POSTMASTER) &&
@@ -162,7 +166,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
}
if (Recovery_PGXCNodeRegister(type, node_name, port,
- proxyname, status,
+ proxyname, status, &xmin,
ipaddress, datafolder, false, myport->sock))
{
ereport(ERROR,
@@ -203,7 +207,8 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
port,
node_name,
datafolder,
- status);
+ status,
+ xmin);
elog(DEBUG1, "node_register_internal() returns rc %d.", _rc);
@@ -235,6 +240,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
pq_sendint(&buf, strlen(node_name), 4);
/* Node name (var-len) */
pq_sendbytes(&buf, node_name, strlen(node_name));
+ pq_sendint(&buf, xmin, sizeof (GlobalTransactionId));
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
diff --git a/src/gtm/test/test_serialize.c b/src/gtm/test/test_serialize.c
index 8d0bde372e..c1049b8862 100644
--- a/src/gtm/test/test_serialize.c
+++ b/src/gtm/test/test_serialize.c
@@ -42,12 +42,11 @@ test_snapshotdata_1(void)
data->sn_xmin = 128;
data->sn_xmax = 256;
- data->sn_recent_global_xmin = 512;
data->sn_xcnt = 1024;
data->sn_xip = 2048;
- printf("sn_xmin=%d, sn_xmax=%d, sn_recent_global_xmin=%d, sn_xcnt=%d, sn_xip=%d\n",
- data->sn_xmin, data->sn_xmax, data->sn_recent_global_xmin, data->sn_xcnt, data->sn_xip);
+ printf("sn_xmin=%d, sn_xmax=%d, sn_xcnt=%d, sn_xip=%d\n",
+ data->sn_xmin, data->sn_xmax, data->sn_xcnt, data->sn_xip);
/* serialize */
buflen = sizeof(GTM_SnapshotData);
@@ -61,8 +60,8 @@ test_snapshotdata_1(void)
/* deserialize */
data2 = (GTM_SnapshotData *)malloc(sizeof(GTM_SnapshotData));
gtm_deserialize_snapshotdata(data2, buf, buflen);
- printf("sn_xmin=%d, sn_xmax=%d, sn_recent_global_xmin=%d, sn_xcnt=%d, sn_xip=%d\n",
- data2->sn_xmin, data2->sn_xmax, data2->sn_recent_global_xmin, data2->sn_xcnt, data2->sn_xip);
+ printf("sn_xmin=%d, sn_xmax=%d, sn_xcnt=%d, sn_xip=%d\n",
+ data2->sn_xmin, data2->sn_xmax, data2->sn_xcnt, data2->sn_xip);
TEARDOWN();
@@ -88,7 +87,6 @@ build_dummy_gtm_transactioninfo()
data->gti_current_snapshot.sn_xmin = 128;
data->gti_current_snapshot.sn_xmax = 256;
- data->gti_current_snapshot.sn_recent_global_xmin = 512;
data->gti_current_snapshot.sn_xcnt = 1024;
data->gti_current_snapshot.sn_xip = 2048;
@@ -149,7 +147,6 @@ test_transactioninfo_1(void)
_ASSERT(data2->gti_coordcount==5);
_ASSERT(data2->gti_current_snapshot.sn_xmin==128);
_ASSERT(data2->gti_current_snapshot.sn_xmax==256);
- _ASSERT(data2->gti_current_snapshot.sn_recent_global_xmin==512);
_ASSERT(data2->gti_current_snapshot.sn_xcnt==1024);
_ASSERT(data2->gti_current_snapshot.sn_xip==2048);
diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h
index 548441ce65..27383d8cac 100644
--- a/src/include/access/gtm.h
+++ b/src/include/access/gtm.h
@@ -44,7 +44,7 @@ extern int CommitPreparedTranGTM(GlobalTransactionId gxid,
extern GTM_Snapshot GetSnapshotGTM(GlobalTransactionId gxid, bool canbe_grouped);
/* Node registration APIs with GTM */
-extern int RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder);
+extern int RegisterGTM(GTM_PGXCNodeType type, GlobalTransactionId *xmin);
extern int UnregisterGTM(GTM_PGXCNodeType type);
/* Sequence interface APIs with GTM */
@@ -62,4 +62,6 @@ extern int DropSequenceGTM(char *name, GTM_SequenceKeyType type);
extern int RenameSequenceGTM(char *seqname, const char *newseqname);
/* Barrier */
extern int ReportBarrierGTM(char *barrier_id);
+extern int ReportGlobalXmin(GlobalTransactionId *gxid,
+ GlobalTransactionId *global_xmin, bool isIdle);
#endif /* ACCESS_GTM_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index caa9d37137..f64c2110bc 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -405,6 +405,7 @@ extern bool IsSendCommandId(void);
extern void SetSendCommandId(bool status);
extern bool IsPGXCNodeXactReadOnly(void);
extern bool IsPGXCNodeXactDatanodeDirect(void);
+extern void TransactionRecordXidWait(TransactionId xid);
#endif
extern int xactGetCommittedChildren(TransactionId **ptr);
diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h
index ee19587ac4..2efe7e20c9 100644
--- a/src/include/gtm/gtm_c.h
+++ b/src/include/gtm/gtm_c.h
@@ -110,7 +110,6 @@ typedef struct GTM_SnapshotData
{
GlobalTransactionId sn_xmin;
GlobalTransactionId sn_xmax;
- GlobalTransactionId sn_recent_global_xmin;
uint32 sn_xcnt;
GlobalTransactionId *sn_xip;
} GTM_SnapshotData;
@@ -151,4 +150,9 @@ typedef enum GTM_PortLastCall
#define _(x) gettext(x)
+#define GTM_ERRCODE_TOO_OLD_XMIN 1
+#define GTM_ERRCODE_NODE_NOT_REGISTERED 2
+#define GTM_ERRCODE_NODE_EXCLUDED 3
+#define GTM_ERRCODE_UNKNOWN 4
+
#endif /* GTM_C_H */
diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h
index b21f253143..d21ba44297 100644
--- a/src/include/gtm/gtm_client.h
+++ b/src/include/gtm/gtm_client.h
@@ -112,6 +112,7 @@ typedef union GTM_ResultData
GTM_PGXCNodeType type; /* NODE_REGISTER */
size_t len;
char *node_name; /* NODE_UNREGISTER */
+ GlobalTransactionId xmin;
} grd_node;
struct
@@ -120,6 +121,13 @@ typedef union GTM_ResultData
GTM_PGXCNodeInfo *nodeinfo[MAX_NODES];
} grd_node_list;
+ struct
+ {
+ GlobalTransactionId reported_xmin;
+ GlobalTransactionId global_xmin;
+ int errcode;
+ } grd_report_xmin; /* REPORT_XMIN */
+
/*
* TODO
* TXN_GET_STATUS
@@ -252,18 +260,17 @@ int node_register(GTM_Conn *conn,
GTM_PGXCNodeType type,
GTM_PGXCNodePort port,
char *node_name,
- char *datafolder);
-int bkup_node_register(GTM_Conn *conn,
- GTM_PGXCNodeType type,
- GTM_PGXCNodePort port,
- char *node_name,
- char *datafolder);
-int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder);
+ char *datafolder,
+ GlobalTransactionId *xmin);
+int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port,
+ char *node_name, char *datafolder, GlobalTransactionId *xmin);
int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, char *node_name,
- char *datafolder, GTM_PGXCNodeStatus status);
-int bkup_node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder);
+ char *datafolder, GTM_PGXCNodeStatus status,
+ GlobalTransactionId *xmin);
int bkup_node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port,
- char *node_name, char *datafolder, GTM_PGXCNodeStatus status);
+ char *node_name, char *datafolder,
+ GTM_PGXCNodeStatus status,
+ GlobalTransactionId xmin);
int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char *node_name);
int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name);
@@ -271,6 +278,10 @@ int backend_disconnect(GTM_Conn *conn, bool is_postmaster, GTM_PGXCNodeType type
char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc);
int register_session(GTM_Conn *conn, const char *coord_name, int coord_procid,
int coord_backendid);
+int report_global_xmin(GTM_Conn *conn, const char *node_name,
+ GTM_PGXCNodeType type, GlobalTransactionId *gxid,
+ GlobalTransactionId *global_xmin,
+ bool isIdle, int *errcode);
/*
* Sequence Management API
diff --git a/src/include/gtm/gtm_gxid.h b/src/include/gtm/gtm_gxid.h
new file mode 100644
index 0000000000..7fe3cf732c
--- /dev/null
+++ b/src/include/gtm/gtm_gxid.h
@@ -0,0 +1,47 @@
+#ifndef _GTM_GXID_H
+#define _GTM_GXID_H
+
+/* ----------------
+ * Special transaction ID values
+ *
+ * BootstrapGlobalTransactionId is the XID for "bootstrap" operations, and
+ * FrozenGlobalTransactionId is used for very old tuples. Both should
+ * always be considered valid.
+ *
+ * FirstNormalGlobalTransactionId is the first "normal" transaction id.
+ * Note: if you need to change it, you must change pg_class.h as well.
+ * ----------------
+ */
+#define BootstrapGlobalTransactionId ((GlobalTransactionId) 1)
+#define FrozenGlobalTransactionId ((GlobalTransactionId) 2)
+#define FirstNormalGlobalTransactionId ((GlobalTransactionId) 3)
+#define MaxGlobalTransactionId ((GlobalTransactionId) 0xFFFFFFFF)
+
+/* ----------------
+ * transaction ID manipulation macros
+ * ----------------
+ */
+#define GlobalTransactionIdIsNormal(xid) ((xid) >= FirstNormalGlobalTransactionId)
+#define GlobalTransactionIdEquals(id1, id2) ((id1) == (id2))
+#define GlobalTransactionIdStore(xid, dest) (*(dest) = (xid))
+#define StoreInvalidGlobalTransactionId(dest) (*(dest) = InvalidGlobalTransactionId)
+
+/* advance a transaction ID variable, handling wraparound correctly */
+#define GlobalTransactionIdAdvance(dest) \
+ do { \
+ (dest)++; \
+ if ((dest) < FirstNormalGlobalTransactionId) \
+ (dest) = FirstNormalGlobalTransactionId; \
+ } while(0)
+
+/* back up a transaction ID variable, handling wraparound correctly */
+#define GlobalTransactionIdRetreat(dest) \
+ do { \
+ (dest)--; \
+ } while ((dest) < FirstNormalGlobalTransactionId)
+
+extern bool GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2);
+extern bool GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2);
+extern bool GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2);
+extern bool GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2);
+#endif
diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h
index f4854cbf5d..51677b6518 100644
--- a/src/include/gtm/gtm_msg.h
+++ b/src/include/gtm/gtm_msg.h
@@ -33,6 +33,8 @@ typedef enum GTM_MessageType
MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */
MSG_BKUP_NODE_UNREGISTER, /* Backup of MSG_NODE_UNREGISTER */
MSG_REGISTER_SESSION, /* Register distributed session with GTM */
+ MSG_REPORT_XMIN, /* Report RecentGlobalXmin to GTM */
+ MSG_BKUP_REPORT_XMIN,
MSG_NODE_LIST, /* Get node list */
MSG_NODE_BEGIN_REPLICATION_INIT,
MSG_NODE_END_REPLICATION_INIT,
@@ -109,6 +111,7 @@ typedef enum GTM_ResultType
NODE_REGISTER_RESULT,
NODE_UNREGISTER_RESULT,
REGISTER_SESSION_RESULT,
+ REPORT_XMIN_RESULT,
NODE_LIST_RESULT,
NODE_BEGIN_REPLICATION_INIT_RESULT,
NODE_END_REPLICATION_INIT_RESULT,
diff --git a/src/include/gtm/gtm_time.h b/src/include/gtm/gtm_time.h
index 9d5bac1f9c..9bcb8e5f79 100644
--- a/src/include/gtm/gtm_time.h
+++ b/src/include/gtm/gtm_time.h
@@ -33,5 +33,10 @@
#endif
GTM_Timestamp GTM_TimestampGetCurrent(void);
+void GTM_TimestampDifference(GTM_Timestamp start_time, GTM_Timestamp stop_time,
+ long *secs, int *microsecs);
+bool GTM_TimestampDifferenceExceeds(GTM_Timestamp start_time,
+ GTM_Timestamp stop_time,
+ int msec);
#endif
diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h
index d220e81df1..b48e26f7a8 100644
--- a/src/include/gtm/gtm_txn.h
+++ b/src/include/gtm/gtm_txn.h
@@ -16,48 +16,11 @@
#include "gtm/libpq-be.h"
#include "gtm/gtm_c.h"
+#include "gtm/gtm_gxid.h"
#include "gtm/gtm_lock.h"
#include "gtm/gtm_list.h"
#include "gtm/stringinfo.h"
-/* ----------------
- * Special transaction ID values
- *
- * BootstrapGlobalTransactionId is the XID for "bootstrap" operations, and
- * FrozenGlobalTransactionId is used for very old tuples. Both should
- * always be considered valid.
- *
- * FirstNormalGlobalTransactionId is the first "normal" transaction id.
- * Note: if you need to change it, you must change pg_class.h as well.
- * ----------------
- */
-#define BootstrapGlobalTransactionId ((GlobalTransactionId) 1)
-#define FrozenGlobalTransactionId ((GlobalTransactionId) 2)
-#define FirstNormalGlobalTransactionId ((GlobalTransactionId) 3)
-#define MaxGlobalTransactionId ((GlobalTransactionId) 0xFFFFFFFF)
-
-/* ----------------
- * transaction ID manipulation macros
- * ----------------
- */
-#define GlobalTransactionIdIsNormal(xid) ((xid) >= FirstNormalGlobalTransactionId)
-#define GlobalTransactionIdEquals(id1, id2) ((id1) == (id2))
-#define GlobalTransactionIdStore(xid, dest) (*(dest) = (xid))
-#define StoreInvalidGlobalTransactionId(dest) (*(dest) = InvalidGlobalTransactionId)
-
-/* advance a transaction ID variable, handling wraparound correctly */
-#define GlobalTransactionIdAdvance(dest) \
- do { \
- (dest)++; \
- if ((dest) < FirstNormalGlobalTransactionId) \
- (dest) = FirstNormalGlobalTransactionId; \
- } while(0)
-
-/* back up a transaction ID variable, handling wraparound correctly */
-#define GlobalTransactionIdRetreat(dest) \
- do { \
- (dest)--; \
- } while ((dest) < FirstNormalGlobalTransactionId)
typedef int XidStatus;
@@ -71,15 +34,12 @@ typedef int XidStatus;
extern bool GlobalTransactionIdDidCommit(GlobalTransactionId transactionId);
extern bool GlobalTransactionIdDidAbort(GlobalTransactionId transactionId);
extern void GlobalTransactionIdAbort(GlobalTransactionId transactionId);
-extern bool GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2);
-extern bool GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2);
-extern bool GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2);
-extern bool GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2);
/* in transam/varsup.c */
extern GlobalTransactionId GTM_GetGlobalTransactionId(GTM_TransactionHandle handle);
extern GlobalTransactionId GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count);
extern GlobalTransactionId ReadNewGlobalTransactionId(void);
+extern GlobalTransactionId GTM_GetLatestCompletedXID(void);
extern void SetGlobalTransactionIdLimit(GlobalTransactionId oldest_datfrozenxid);
extern void SetNextGlobalTransactionId(GlobalTransactionId gxid);
extern void GTM_SetShuttingDown(void);
@@ -246,6 +206,7 @@ void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message);
void ProcessGXIDListCommand(Port *myport, StringInfo message);
void ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message);
+void ProcessReportXminCommand(Port *myport, StringInfo message, bool is_backup);
void ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message);
void ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message);
diff --git a/src/include/gtm/register.h b/src/include/gtm/register.h
index 8fbaac19c4..734c12cabf 100644
--- a/src/include/gtm/register.h
+++ b/src/include/gtm/register.h
@@ -58,6 +58,16 @@ typedef struct GTM_PGXCNodeInfo
char *ipaddress; /* IP address of the nodes */
char *datafolder; /* Data folder of the node */
GTM_PGXCNodeStatus status; /* Node status */
+ bool excluded; /*
+ * Has the node timed out and be
+ * excluded from xmin computation?
+ */
+ bool idle; /* Has the node been idle since
+ * last report
+ */
+ GlobalTransactionId reported_xmin; /* Last reported xmin */
+ GTM_Timestamp reported_xmin_time; /* Time when last report was
+ received */
int max_sessions;
int num_sessions;
GTM_PGXCSession *sessions;
@@ -77,6 +87,7 @@ int Recovery_PGXCNodeRegister(GTM_PGXCNodeType type,
GTM_PGXCNodePort port,
char *proxyname,
GTM_PGXCNodeStatus status,
+ GlobalTransactionId *xmin,
char *ipaddress,
char *datafolder,
bool in_recovery,
@@ -103,4 +114,7 @@ void ProcessPGXCNodeList(Port *myport, StringInfo message);
void ProcessGTMBeginBackup(Port *myport, StringInfo message);
void ProcessGTMEndBackup(Port *myport, StringInfo message);
+void GTM_InitNodeManager(void);
+GlobalTransactionId GTM_HandleGlobalXmin(GTM_PGXCNodeType type, char *node_name,
+ GlobalTransactionId *reported_xmin, bool remoteIdle, int *errcode);
#endif /* GTM_NODE_H */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 6c47f8611d..b9e5919580 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -403,6 +403,7 @@ typedef enum
WalReceiverProcess,
#ifdef PGXC
PoolerProcess,
+ ClusterMonitorProcess,
#endif
NUM_AUXPROCTYPES /* Must be last! */
@@ -416,6 +417,7 @@ extern AuxProcType MyAuxProcType;
#define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess)
#define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess)
#define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess)
+#define AmClusterMonitorProcess() (MyAuxProcType == ClusterMonitorProcess)
/*****************************************************************************
diff --git a/src/include/pgxc/pgxc.h b/src/include/pgxc/pgxc.h
index 9cd3995b2f..af68918b5d 100644
--- a/src/include/pgxc/pgxc.h
+++ b/src/include/pgxc/pgxc.h
@@ -19,8 +19,8 @@
#ifndef PGXC_H
#define PGXC_H
-#include "storage/lwlock.h"
#include "postgres.h"
+#include "storage/lwlock.h"
extern bool isPGXCCoordinator;
extern bool isPGXCDataNode;
diff --git a/src/include/postmaster/clustermon.h b/src/include/postmaster/clustermon.h
new file mode 100644
index 0000000000..0c97fa9fdd
--- /dev/null
+++ b/src/include/postmaster/clustermon.h
@@ -0,0 +1,42 @@
+/*-------------------------------------------------------------------------
+ *
+ * clustermon.h
+ * header file for cluster monitor process
+ *
+ *
+ * Portions Copyright (c) 2015, 2ndQuadrant Ltd
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010-2012 Postgres-XC Development Group
+ *
+ * src/include/postmaster/autovacuum.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CLUSTERMON_H
+#define CLUSTERMON_H
+
+typedef struct
+{
+ slock_t mutex;
+ GlobalTransactionId reported_recent_global_xmin;
+ GlobalTransactionId gtm_recent_global_xmin;
+} ClusterMonitorCtlData;
+
+extern Size ClusterMonitorShmemSize(void);
+
+/* Status inquiry functions */
+extern bool IsClusterMonitorProcess(void);
+
+/* Functions to start cluster monitor process, called from postmaster */
+extern int StartClusterMonitor(void);
+GlobalTransactionId ClusterMonitorGetGlobalXmin(void);
+void ClusterMonitorSetGlobalXmin(GlobalTransactionId xmin);
+
+#ifdef EXEC_BACKEND
+extern void ClusterMonitorIAm(void);
+#endif
+
+extern int ClusterMonitorInit(void);
+
+#endif /* CLUSTERMON_H */
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index d3053baf62..a63cb067aa 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -68,12 +68,16 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
TransactionId sourcexid);
extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
+void ProcArrayCheckXminConsistency(TransactionId global_xmin);
extern RunningTransactions GetRunningTransactionData(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
+extern TransactionId GetOldestXminInternal(Relation rel, bool ignoreVacuum,
+ bool computeLocal, bool *isIdle, TransactionId lastGlobalXmin,
+ TransactionId lastReportedXmin);
extern TransactionId GetOldestActiveTransactionId(void);
extern TransactionId GetOldestSafeDecodingTransactionId(void);