diff options
Diffstat (limited to 'src')
41 files changed, 1608 insertions, 584 deletions
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c index 563e5c353c..0d605f5728 100644 --- a/src/backend/access/heap/pruneheap.c +++ b/src/backend/access/heap/pruneheap.c @@ -99,7 +99,8 @@ heap_page_prune_opt(Relation relation, Buffer buffer) else OldestXmin = RecentGlobalDataXmin; - Assert(TransactionIdIsValid(OldestXmin)); + if (!TransactionIdIsValid(OldestXmin)) + return; /* * Let's see if we really need pruning. diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c index 91d5fe7182..50b6646c72 100644 --- a/src/backend/access/transam/gtm.c +++ b/src/backend/access/transam/gtm.c @@ -21,6 +21,7 @@ #include "pgxc/pgxc.h" #include "gtm/gtm_c.h" #include "postmaster/autovacuum.h" +#include "postmaster/clustermon.h" #include "storage/backendid.h" #include "tcop/tcopprot.h" #include "utils/guc.h" @@ -98,6 +99,8 @@ InitGTM(void) elog(DEBUG1, "Autovacuum worker: connection established to GTM with string %s", conn_str); else if (IsAutoVacuumLauncherProcess()) elog(DEBUG1, "Autovacuum launcher: connection established to GTM with string %s", conn_str); + else if (IsClusterMonitorProcess()) + elog(DEBUG1, "Cluster monitor: connection established to GTM with string %s", conn_str); else elog(DEBUG1, "Postmaster child: connection established to GTM with string %s", conn_str); } @@ -133,6 +136,8 @@ CloseGTM(void) elog(DEBUG1, "Autovacuum worker: connection to GTM closed"); else if (IsAutoVacuumLauncherProcess()) elog(DEBUG1, "Autovacuum launcher: connection to GTM closed"); + else if (IsClusterMonitorProcess()) + elog(DEBUG1, "Cluster monitor: connection to GTM closed"); else elog(DEBUG1, "Postmaster child: connection to GTM closed"); } @@ -607,7 +612,7 @@ RenameSequenceGTM(char *seqname, const char *newseqname) * Connection for registering is just used once then closed */ int -RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder) +RegisterGTM(GTM_PGXCNodeType type, GlobalTransactionId *xmin) { int ret; @@ -616,7 +621,7 @@ RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder) if (!conn) return EOF; - ret = node_register(conn, type, port, PGXCNodeName, datafolder); + ret = node_register(conn, type, 0, PGXCNodeName, "", xmin); /* If something went wrong, retry once */ if (ret < 0) @@ -624,7 +629,8 @@ RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder) CloseGTM(); InitGTM(); if (conn) - ret = node_register(conn, type, port, PGXCNodeName, datafolder); + ret = node_register(conn, type, 0, PGXCNodeName, "", + xmin); } return ret; @@ -681,3 +687,20 @@ ReportBarrierGTM(char *barrier_id) return(report_barrier(conn, barrier_id)); } +int +ReportGlobalXmin(GlobalTransactionId *gxid, GlobalTransactionId *global_xmin, + bool isIdle) +{ + int errcode = GTM_ERRCODE_UNKNOWN; + + CheckConnection(); + if (!conn) + return EOF; + + if (report_global_xmin(conn, PGXCNodeName, + IS_PGXC_COORDINATOR ? GTM_NODE_COORDINATOR : GTM_NODE_DATANODE, + gxid, global_xmin, isIdle, &errcode)) + return errcode; + else + return 0; +} diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index e0253a6c00..d2248df540 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -318,6 +318,9 @@ AuxiliaryProcessMain(int argc, char *argv[]) case PoolerProcess: statmsg = "pooler process"; break; + case ClusterMonitorProcess: + statmsg = "cluster monitor process"; + break; #endif case StartupProcess: statmsg = "startup process"; @@ -422,6 +425,11 @@ AuxiliaryProcessMain(int argc, char *argv[]) /* don't set signals, pool manager has its own agenda */ PoolManagerInit(); proc_exit(1); /* should never return */ + + case ClusterMonitorProcess: + /* don't set signals, cluster monitor has its own agenda */ + ClusterMonitorInit(); + proc_exit(1); /* should never return */ #endif case CheckerProcess: diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c23211b2..80f205c22c 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o clustermon.o \ + fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/clustermon.c b/src/backend/postmaster/clustermon.c new file mode 100644 index 0000000000..75a68bd73c --- /dev/null +++ b/src/backend/postmaster/clustermon.c @@ -0,0 +1,407 @@ +/*------------------------------------------------------------------------- + * + * clustermon.c + * + * Postgres-XL Cluster Monitor + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Portions Copyright (c) 2015, 2ndQuadrant Ltd + * Portions Copyright (c) 2012-2014, TransLattice, Inc. + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/postmaster/clustermon.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <signal.h> +#include <sys/types.h> +#include <sys/time.h> +#include <unistd.h> + +#include "access/transam.h" +#include "access/xact.h" +#include "gtm/gtm_c.h" +#include "gtm/gtm_gxid.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "pgxc/pgxc.h" +#include "postmaster/clustermon.h" +#include "postmaster/fork_process.h" +#include "postmaster/postmaster.h" +#include "storage/proc.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" +#include "utils/timestamp.h" + +/* Flags to tell if we are in a clustermon process */ +static bool am_clustermon = false; + +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGUSR2 = false; +static volatile sig_atomic_t got_SIGTERM = false; + +/* Memory context for long-lived data */ +static MemoryContext ClusterMonitorMemCxt; +static ClusterMonitorCtlData *ClusterMonitorCtl = NULL; + +static void cm_sighup_handler(SIGNAL_ARGS); +static void cm_sigterm_handler(SIGNAL_ARGS); +static void ClusterMonitorSetReportedGlobalXmin(GlobalTransactionId xmin); +static GlobalTransactionId ClusterMonitorGetReportedGlobalXmin(void); + +/* PID of clustser monitoring process */ +int ClusterMonitorPid = 0; + +#define CLUSTER_MONITOR_NAPTIME 5 + +/* + * Main loop for the cluster monitor process. + */ +int +ClusterMonitorInit(void) +{ + sigjmp_buf local_sigjmp_buf; + GTM_PGXCNodeType nodetype = IS_PGXC_DATANODE ? + GTM_NODE_DATANODE : + GTM_NODE_COORDINATOR; + GlobalTransactionId oldestXmin; + GlobalTransactionId newOldestXmin; + GlobalTransactionId reportedXmin; + GlobalTransactionId lastGlobalXmin; + int status; + + am_clustermon = true; + + /* Identify myself via ps */ + init_ps_display("cluster monitor process", "", "", ""); + + ereport(LOG, + (errmsg("cluster monitor started"))); + + if (PostAuthDelay) + pg_usleep(PostAuthDelay * 1000000L); + + /* + * Set up signal handlers. We operate on databases much like a regular + * backend, so we use the same signal handling. See equivalent code in + * tcop/postgres.c. + */ + pqsignal(SIGHUP, cm_sighup_handler); + pqsignal(SIGINT, StatementCancelHandler); + pqsignal(SIGTERM, cm_sigterm_handler); + + pqsignal(SIGQUIT, quickdie); + InitializeTimeouts(); /* establishes SIGALRM handler */ + + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. + */ + ClusterMonitorMemCxt = AllocSetContextCreate(TopMemoryContext, + "Cluster Monitor", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(ClusterMonitorMemCxt); + + InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL); + + SetProcessingMode(NormalProcessing); + + /* + * Start a dummy transaction so that we start computing OldestXmin + * with the current latestCompletedXid, especially when this server is just + * started and joining the cluster + */ + StartTransactionCommand(); + (void) GetTopTransactionId(); + CommitTransactionCommand(); + + /* + * Register this node with the GTM + */ + oldestXmin = InvalidGlobalTransactionId; + if (RegisterGTM(nodetype, &oldestXmin) < 0) + { + UnregisterGTM(nodetype); + oldestXmin = InvalidGlobalTransactionId; + if (RegisterGTM(nodetype, &oldestXmin) < 0) + { + ereport(LOG, + (errcode(ERRCODE_IO_ERROR), + errmsg("Can not register node on GTM"))); + } + } + + /* + * If the registration is successful, GTM would send us back current + * GlobalXmin. Initialise our local state to the same value + */ + ClusterMonitorSetReportedGlobalXmin(oldestXmin); + + /* + * If an exception is encountered, processing resumes here. + * + * This code is a stripped down version of PostgresMain error recovery. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Forget any pending QueryCancel or timeout request */ + disable_all_timeouts(false); + QueryCancelPending = false; /* second to avoid race condition */ + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(ClusterMonitorMemCxt); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextResetAndDeleteChildren(ClusterMonitorMemCxt); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* if in shutdown mode, no need for anything further; just go away */ + if (got_SIGTERM) + goto shutdown; + + /* + * Sleep at least 1 second after any error. We don't want to be + * filling the error logs as fast as we can. + */ + pg_usleep(1000000L); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* must unblock signals before calling rebuild_database_list */ + PG_SETMASK(&UnBlockSig); + + /* + * Force statement_timeout and lock_timeout to zero to avoid letting these + * settings prevent regular maintenance from being executed. + */ + SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); + SetConfigOption("lock_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); + + /* loop until shutdown request */ + while (!got_SIGTERM) + { + struct timeval nap; + int rc; + bool isIdle; + + /* + * Compute RecentGlobalXmin, report it to the GTM and sleep for the set + * interval. Keep doing this forever + */ + isIdle = false; + reportedXmin = ClusterMonitorGetReportedGlobalXmin(); + lastGlobalXmin = ClusterMonitorGetGlobalXmin(); + oldestXmin = GetOldestXminInternal(NULL, false, true, &isIdle, + lastGlobalXmin, reportedXmin); + + if (GlobalTransactionIdPrecedes(oldestXmin, reportedXmin)) + oldestXmin = reportedXmin; + + if (GlobalTransactionIdPrecedes(oldestXmin, lastGlobalXmin)) + oldestXmin = lastGlobalXmin; + + if ((status = ReportGlobalXmin(&oldestXmin, &newOldestXmin, isIdle))) + { + elog(DEBUG2, "Failed to report RecentGlobalXmin to GTM - %d:%d", + status, newOldestXmin); + if (status == GTM_ERRCODE_TOO_OLD_XMIN || + status == GTM_ERRCODE_NODE_EXCLUDED) + elog(PANIC, "Global xmin computation mismatch"); + } + else + { + ClusterMonitorSetReportedGlobalXmin(oldestXmin); + elog(DEBUG2, "Updating global_xmin to %d", newOldestXmin); + if (GlobalTransactionIdIsValid(newOldestXmin)) + ClusterMonitorSetGlobalXmin(newOldestXmin); + } + + /* + * Repeat at every 30 seconds + */ + nap.tv_sec = CLUSTER_MONITOR_NAPTIME; + nap.tv_usec = 0; + + /* + * Wait until naptime expires or we get some type of signal (all the + * signal handlers will wake us by calling SetLatch). + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + (nap.tv_sec * 1000L) + (nap.tv_usec / 1000L)); + + ResetLatch(MyLatch); + + /* Process sinval catchup interrupts that happened while sleeping */ + ProcessCatchupInterrupt(); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* the normal shutdown case */ + if (got_SIGTERM) + break; + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + + /* Normal exit from the cluster monitor is here */ +shutdown: + UnregisterGTM(nodetype); + ereport(LOG, + (errmsg("cluster monitor shutting down"))); + + proc_exit(0); /* done */ +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +cm_sighup_handler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* SIGTERM: time to die */ +static void +cm_sigterm_handler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGTERM = true; + SetLatch(MyLatch); + + errno = save_errno; +} + + +/* + * IsClusterMonitor functions + * Return whether this is either a cluster monitor process or a worker + * process. + */ +bool +IsClusterMonitorProcess(void) +{ + return am_clustermon; +} + +/* Report shared-memory space needed by ClusterMonitor */ +Size +ClusterMonitorShmemSize(void) +{ + return sizeof (ClusterMonitorCtlData); +} + +void +ClusterMonitorShmemInit(void) +{ + bool found; + + ClusterMonitorCtl = (ClusterMonitorCtlData *) + ShmemInitStruct("Cluster Monitor Ctl", ClusterMonitorShmemSize(), &found); + + if (!found) + { + /* First time through, so initialize */ + MemSet(ClusterMonitorCtl, 0, ClusterMonitorShmemSize()); + SpinLockInit(&ClusterMonitorCtl->mutex); + } +} + +GlobalTransactionId +ClusterMonitorGetGlobalXmin(void) +{ + GlobalTransactionId xmin; + + SpinLockAcquire(&ClusterMonitorCtl->mutex); + xmin = ClusterMonitorCtl->gtm_recent_global_xmin; + SpinLockRelease(&ClusterMonitorCtl->mutex); + + return xmin; +} + +void +ClusterMonitorSetGlobalXmin(GlobalTransactionId xmin) +{ + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + ProcArrayCheckXminConsistency(xmin); + + SpinLockAcquire(&ClusterMonitorCtl->mutex); + ClusterMonitorCtl->gtm_recent_global_xmin = xmin; + SpinLockRelease(&ClusterMonitorCtl->mutex); + + LWLockRelease(ProcArrayLock); +} + +static void +ClusterMonitorSetReportedGlobalXmin(GlobalTransactionId xmin) +{ + elog(DEBUG2, "ClusterMonitorSetReportedGlobalXmin - old %d, new %d", + ClusterMonitorCtl->reported_recent_global_xmin, + xmin); + SpinLockAcquire(&ClusterMonitorCtl->mutex); + ClusterMonitorCtl->reported_recent_global_xmin = xmin; + SpinLockRelease(&ClusterMonitorCtl->mutex); +} + +static GlobalTransactionId +ClusterMonitorGetReportedGlobalXmin(void) +{ + GlobalTransactionId reported_xmin; + + SpinLockAcquire(&ClusterMonitorCtl->mutex); + reported_xmin = ClusterMonitorCtl->reported_recent_global_xmin; + SpinLockRelease(&ClusterMonitorCtl->mutex); + + return reported_xmin; +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 58d9c9dfdc..86ebaac5f0 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -261,6 +261,9 @@ static pid_t StartupPID = 0, #ifdef PGXC /* PGXC_COORD */ PgPoolerPID = 0, #endif /* PGXC_COORD */ +#ifdef XCP + ClusterMonPID = 0, +#endif BgWriterPID = 0, CheckpointerPID = 0, WalWriterPID = 0, @@ -581,6 +584,7 @@ Datum xc_lockForBackupKey1; Datum xc_lockForBackupKey2; #define StartPoolManager() StartChildProcess(PoolerProcess) +#define StartClusterMonitor() StartChildProcess(ClusterMonitorProcess) #endif #define StartupDataBase() StartChildProcess(StartupProcess) @@ -1359,6 +1363,7 @@ PostmasterMain(int argc, char *argv[]) MemoryContextSwitchTo(oldcontext); #endif /* PGXC */ + /* Some workers may be scheduled to start now */ maybe_start_bgworker(); @@ -1781,6 +1786,12 @@ ServerLoop(void) PgPoolerPID = StartPoolManager(); #endif /* PGXC */ +#ifdef XCP + /* If we have lost the cluster monitor, try to start a new one */ + if (ClusterMonPID == 0 && pmState == PM_RUN) + ClusterMonPID = StartClusterMonitor(); +#endif + /* If we have lost the archiver, try to start a new one. */ if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = pgarch_start(); @@ -2482,6 +2493,10 @@ SIGHUP_handler(SIGNAL_ARGS) if (PgPoolerPID != 0) signal_child(PgPoolerPID, SIGHUP); #endif /* PGXC */ +#ifdef XCP + if (ClusterMonPID != 0) + signal_child(ClusterMonPID, SIGHUP); +#endif if (BgWriterPID != 0) signal_child(BgWriterPID, SIGHUP); if (CheckpointerPID != 0) @@ -2571,7 +2586,8 @@ pmdie(SIGNAL_ARGS) /* and the pool manager too */ if (PgPoolerPID != 0) signal_child(PgPoolerPID, SIGTERM); - + if (ClusterMonPID != 0) + signal_child(ClusterMonPID, SIGTERM); #endif /* @@ -2619,6 +2635,9 @@ pmdie(SIGNAL_ARGS) /* and the pool manager too */ if (PgPoolerPID != 0) signal_child(PgPoolerPID, SIGTERM); + /* and the cluster monitor too */ + if (ClusterMonPID != 0) + signal_child(ClusterMonPID, SIGTERM); #endif /* XCP */ SignalUnconnectedWorkers(SIGTERM); if (pmState == PM_RECOVERY) @@ -2649,6 +2668,10 @@ pmdie(SIGNAL_ARGS) /* and the walwriter too */ if (WalWriterPID != 0) signal_child(WalWriterPID, SIGTERM); +#ifdef XCP + if (ClusterMonPID != 0) + signal_child(ClusterMonPID, SIGTERM); +#endif pmState = PM_WAIT_BACKENDS; } @@ -2807,6 +2830,11 @@ reaper(SIGNAL_ARGS) PgPoolerPID = StartPoolManager(); #endif /* PGXC */ +#ifdef XCP + if (ClusterMonPID == 0) + ClusterMonPID = StartClusterMonitor(); +#endif + /* workers may be scheduled to start now */ maybe_start_bgworker(); @@ -2991,6 +3019,16 @@ reaper(SIGNAL_ARGS) } #endif +#ifdef XCP + if (pid == ClusterMonPID) + { + ClusterMonPID = 0; + if (!EXIT_STATUS_0(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("cluster monitor process")); + continue; + } +#endif /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3426,6 +3464,19 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) } #endif /* PGXC */ +#ifdef XCP + if (pid == ClusterMonPID) + ClusterMonPID = 0; + else if (ClusterMonPID != 0 && !FatalError) + { + ereport(DEBUG2, + (errmsg_internal("sending %s to process %d", + (SendStop ? "SIGSTOP" : "SIGQUIT"), + (int) ClusterMonPID))); + signal_child(ClusterMonPID, (SendStop ? SIGSTOP : SIGQUIT)); + } +#endif + /* * Force a power-cycle of the pgarch process too. (This isn't absolutely * necessary, but it seems like a good idea for robustness, and it @@ -3611,6 +3662,9 @@ PostmasterStateMachine(void) #ifdef PGXC PgPoolerPID == 0 && #endif +#ifdef XCP + ClusterMonPID == 0 && +#endif WalReceiverPID == 0 && BgWriterPID == 0 && (CheckpointerPID == 0 || @@ -3711,6 +3765,9 @@ PostmasterStateMachine(void) #ifdef PGXC Assert(PgPoolerPID == 0); #endif +#ifdef XCP + Assert(ClusterMonPID == 0); +#endif Assert(StartupPID == 0); Assert(WalReceiverPID == 0); Assert(BgWriterPID == 0); @@ -3923,6 +3980,10 @@ TerminateChildren(int signal) if (PgPoolerPID != 0) signal_child(PgPoolerPID, SIGQUIT); #endif +#ifdef XCP + if (ClusterMonPID != 0) + signal_child(ClusterMonPID, signal); +#endif if (BgWriterPID != 0) signal_child(BgWriterPID, signal); if (CheckpointerPID != 0) @@ -5334,6 +5395,10 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork pool manager process: %m"))); break; + case ClusterMonitorProcess: + ereport(LOG, + (errmsg("could not fork cluster monitor process: %m"))); + break; #endif case StartupProcess: ereport(LOG, diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index db5f4f605c..3895031901 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -153,6 +153,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SharedQueueShmemSize()); if (IS_PGXC_COORDINATOR) size = add_size(size, ClusterLockShmemSize()); + size = add_size(size, ClusterMonitorShmemSize()); #endif size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); @@ -274,6 +275,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SharedQueuesInit(); if (IS_PGXC_COORDINATOR) ClusterLockShmemInit(); + ClusterMonitorShmemInit(); #endif /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e26ff6c102..48840c24bb 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1220,10 +1220,39 @@ TransactionIdIsActive(TransactionId xid) TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum) { + return GetOldestXminInternal(rel, ignoreVacuum, false, NULL, + InvalidTransactionId, InvalidTransactionId); +} + +/* + * This implements most of the logic that GetOldestXmin needs. In XL, we don't + * actually compute OldestXmin unless specifically told to do by computeLocal + * argument set to true which GetOldestXmin never done. So we just return the + * value from the shared memory. The OldestXmin itself is always computed by + * the Cluster Monitor process by sending local state information to the GTM, + * which then aggregates information from all the nodes and gives out final + * OldestXmin or GlobalXmin which is consistent across the entire cluster. + * + * In addition, Cluster Monitor also passes the last reported xmin (or the one + * sent back by GTM in case we were idle) and the last received GlobalXmin. We + * must ensure that we don't see an XID or xmin which is beyond these horizons. + * Otherwise it signals problems with the GlobalXmin calculation. This can + * happen because of network disconnects or extreme load on the machine + * (unlikely). In any case, we must restart ourselves to avoid any data + * consistency problem. A more careful approach could involve killing only + * those backends which are running with old xid or xmin. We can consider + * implementing it that way in future + */ +TransactionId +GetOldestXminInternal(Relation rel, bool ignoreVacuum, bool computeLocal, + bool *isIdle, TransactionId lastGlobalXmin, + TransactionId lastReportedXmin) +{ ProcArrayStruct *arrayP = procArray; TransactionId result; int index; bool allDbs; + TransactionId xmin; volatile TransactionId replication_slot_xmin = InvalidTransactionId; volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; @@ -1235,9 +1264,14 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) */ allDbs = rel == NULL || rel->rd_rel->relisshared; -#ifdef PGXC - if (TransactionIdIsValid(RecentGlobalXmin)) - return RecentGlobalXmin; +#ifdef XCP + if (!computeLocal) + { + xmin = (TransactionId) ClusterMonitorGetGlobalXmin(); + if (!TransactionIdIsValid(xmin)) + xmin = FirstNormalTransactionId; + return xmin; + } #endif /* Cannot look for individual databases during recovery */ @@ -1252,8 +1286,18 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) * additions. */ result = ShmemVariableCache->latestCompletedXid; +#ifdef XCP + if (!TransactionIdIsValid(result)) + result = FirstNormalTransactionId; + else + TransactionIdAdvance(result); + + if (isIdle) + *isIdle = true; +#else Assert(TransactionIdIsNormal(result)); TransactionIdAdvance(result); +#endif for (index = 0; index < arrayP->numProcs; index++) { @@ -1277,6 +1321,9 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) { /* Fetch xid just once - see GetNewTransactionId */ TransactionId xid = pgxact->xid; +#ifdef XCP + TransactionId xmin; +#endif /* First consider the transaction's own Xid, if any */ if (TransactionIdIsNormal(xid) && @@ -1290,10 +1337,49 @@ GetOldestXmin(Relation rel, bool ignoreVacuum) * have an Xmin but not (yet) an Xid; conversely, if it has an * Xid, that could determine some not-yet-set Xmin. */ +#ifdef XCP + xmin = pgxact->xmin; /* Fetch just once */ + if (TransactionIdIsNormal(xmin) && + TransactionIdPrecedes(xmin, result)) + result = xmin; + + /* + * If we found a normal xid or a transaction running with xmin set, + * we are not idle + */ + if (isIdle && + (TransactionIdIsNormal(xmin) || TransactionIdIsNormal(xid))) + *isIdle = false; + + /* + * If we see an xid or an xmin which precedes either the last + * reported xmin or the GlobalXmin calculated by the + * Cluster Monitor process then it signals bad things and we must + * abort and restart the database server + */ + if (TransactionIdIsValid(lastReportedXmin)) + { + if ((TransactionIdIsValid(xmin) && TransactionIdPrecedes(xmin, lastReportedXmin)) || + (TransactionIdIsValid(xid) && TransactionIdPrecedes(xid, + lastReportedXmin))) + elog(PANIC, "Found xid (%d) or xmin (%d) precedes last " + "reported xmin (%d)", xid, xmin, lastReportedXmin); + } + + if (TransactionIdIsValid(lastGlobalXmin)) + { + if ((TransactionIdIsValid(xmin) && TransactionIdPrecedes(xmin, lastGlobalXmin)) || + (TransactionIdIsValid(xid) && TransactionIdPrecedes(xid, + lastGlobalXmin))) + elog(PANIC, "Found xid (%d) or xmin (%d) precedes " + "global xmin (%d)", xid, xmin, lastGlobalXmin); + } +#else xid = pgxact->xmin; /* Fetch just once */ if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, result)) result = xid; +#endif } } @@ -1433,6 +1519,9 @@ GetSnapshotData(Snapshot snapshot) bool suboverflowed = false; volatile TransactionId replication_slot_xmin = InvalidTransactionId; volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId; +#ifdef XCP + TransactionId clustermon_xmin; +#endif #ifdef PGXC /* PGXC_DATANODE */ /* @@ -1659,6 +1748,12 @@ GetSnapshotData(Snapshot snapshot) if (TransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; +#ifdef XCP + clustermon_xmin = ClusterMonitorGetGlobalXmin(); + if (TransactionIdPrecedes(clustermon_xmin, globalxmin)) + globalxmin = clustermon_xmin; +#endif + /* Update global variables too */ RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age; if (!TransactionIdIsNormal(RecentGlobalXmin)) @@ -3129,10 +3224,7 @@ GetSnapshotDataFromGTM(Snapshot snapshot) * establishment or auto-analyze scans. Nevertheless, we MUST fix this * before going to production release */ - if (IS_PGXC_LOCAL_COORDINATOR) - gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionId(), canbe_grouped); - else - gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped); + gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped); if (!gtm_snapshot) @@ -3141,7 +3233,13 @@ GetSnapshotDataFromGTM(Snapshot snapshot) errmsg("GTM error, could not obtain snapshot. Current XID = %d, Autovac = %d", GetCurrentTransactionId(), IsAutoVacuumWorkerProcess()))); else { - RecentGlobalXmin = gtm_snapshot->sn_recent_global_xmin; + /* + * Set RecentGlobalXmin by copying from the shared memory state + * maintained by the Clutser Monitor + */ + RecentGlobalXmin = ClusterMonitorGetGlobalXmin(); + if (!TransactionIdIsValid(RecentGlobalXmin)) + RecentGlobalXmin = FirstNormalTransactionId; /* * XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ? */ @@ -3161,6 +3259,7 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot) { int index; ProcArrayStruct *arrayP = procArray; + TransactionId global_xmin; snapshot->xmin = globalSnapshot.gxmin; snapshot->xmax = globalSnapshot.gxmax; @@ -3208,15 +3307,6 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot) snapshot->max_xcnt = globalSnapshot.gxcnt; } - memcpy(snapshot->xip, globalSnapshot.gxip, - globalSnapshot.gxcnt * sizeof(TransactionId)); - snapshot->curcid = GetCurrentCommandId(false); - - if (!TransactionIdIsValid(MyPgXact->xmin)) - MyPgXact->xmin = TransactionXmin = globalSnapshot.gxmin; - - RecentXmin = globalSnapshot.gxmin; - /* PGXCTODO - set this until we handle subtransactions. */ snapshot->subxcnt = 0; @@ -3235,64 +3325,37 @@ GetSnapshotFromGlobalSnapshot(Snapshot snapshot) LWLockAcquire(ProcArrayLock, LW_SHARED); /* - * Spin over analyzeProcArray and add these local analyze XIDs to the - * local snapshot. + * Once we have a SHARED lock on the ProcArrayLock, fetch the + * GlobalXmin and ensure that the snapshot we are dealing with isn't + * too old. Since such a snapshot may need to see rows that have + * already been removed by the server + * + * These scenarios are not very likely to happen because the + * ClusterMonitor will ensure that GlobalXmins are reported to GTM in + * time and the GlobalXmin on the GTM can't advance past the reported + * xmins. But in some cases where a node fails to report its GlobalXmin + * and gets excluded from the list of nodes on GTM, the GlobalXmin will + * be advanced. Usually such node will shoot itself in the head + * and rejoin the cluster, but if at all it sends a snapshot to us, we + * should protect ourselves from using it */ - for (index = 0; index < arrayP->numProcs; index++) - { - int pgprocno = arrayP->pgprocnos[index]; - volatile PGPROC *proc = &allProcs[pgprocno]; - volatile PGXACT *pgxact = &allPgXact[pgprocno]; + global_xmin = ClusterMonitorGetGlobalXmin(); + if (!TransactionIdIsValid(global_xmin)) + global_xmin = FirstNormalTransactionId; - /* Do that only for an autovacuum process */ - if (pgxact->vacuumFlags & PROC_IS_AUTOVACUUM) - { - TransactionId xid; - - /* Update globalxmin to be the smallest valid xmin */ - xid = pgxact->xmin; /* fetch just once */ + if (TransactionIdPrecedes(globalSnapshot.gxmin, global_xmin)) + elog(ERROR, "Snapshot too old - RecentGlobalXmin has already " + "advanced past the snapshot xmin"); - if (TransactionIdIsNormal(xid) && - TransactionIdPrecedes(xid, RecentGlobalXmin)) - RecentGlobalXmin = xid; + memcpy(snapshot->xip, globalSnapshot.gxip, + globalSnapshot.gxcnt * sizeof(TransactionId)); + snapshot->curcid = GetCurrentCommandId(false); - /* Fetch xid just once - see GetNewTransactionId */ - xid = pgxact->xid; + if (!TransactionIdIsValid(MyPgXact->xmin)) + MyPgXact->xmin = TransactionXmin = globalSnapshot.gxmin; - /* - * If the transaction has been assigned an xid < xmax we add it to the - * snapshot, and update xmin if necessary. There's no need to store - * XIDs >= xmax, since we'll treat them as running anyway. We don't - * bother to examine their subxids either. - * - * We don't include our own XID (if any) in the snapshot, but we must - * include it into xmin. - */ - if (TransactionIdIsNormal(xid)) - { - if (TransactionIdFollowsOrEquals(xid, snapshot->xmax)) - continue; - if (proc != MyProc) - { - if (snapshot->xcnt >= snapshot->max_xcnt) - { - snapshot->max_xcnt += arrayP->numProcs; - - snapshot->xip = (TransactionId *) - realloc(snapshot->xip, snapshot->max_xcnt * sizeof(TransactionId)); - if (snapshot->xip == NULL) - ereport(ERROR, - (errcode(ERRCODE_OUT_OF_MEMORY), - errmsg("out of memory"))); - } - snapshot->xip[snapshot->xcnt++] = xid; - elog(DEBUG1, "Adding Analyze for xid %d to snapshot", pgxact->xid); - } - if (TransactionIdPrecedes(xid, snapshot->xmin)) - snapshot->xmin = xid; - } - } - } + RecentXmin = globalSnapshot.gxmin; + RecentGlobalXmin = global_xmin; /* * XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ? @@ -4283,3 +4346,43 @@ KnownAssignedXidsReset(void) LWLockRelease(ProcArrayLock); } + +/* + * Do a consistency check on the running processes. Cluster Monitor uses this + * API to check if some transaction has started with an xid or xmin lower than + * the GlobalXmin reported by the GTM. This can only happen under extreme + * conditions and we must take necessary steps to safe-guard against such + * anomalies. + */ +void +ProcArrayCheckXminConsistency(TransactionId global_xmin) +{ + ProcArrayStruct *arrayP = procArray; + int index; + + for (index = 0; index < arrayP->numProcs; index++) + { + int pgprocno = arrayP->pgprocnos[index]; + volatile PGPROC *proc = &allProcs[pgprocno]; + volatile PGXACT *pgxact = &allPgXact[pgprocno]; + TransactionId xid; + + xid = pgxact->xid; /* fetch just once */ + if (!TransactionIdIsNormal(xid)) + continue; + + if (!TransactionIdFollowsOrEquals(xid, global_xmin)) + elog(PANIC, "xmin consistency check failed - found %d xid in " + "PGPROC %d ahead of GlobalXmin %d", xid, pgprocno, + global_xmin); + + xid = pgxact->xmin; /* fetch just once */ + if (!TransactionIdIsNormal(xid)) + continue; + + if (!TransactionIdFollowsOrEquals(xid, global_xmin)) + elog(PANIC, "xmin consistency check failed - found %d xmin in " + "PGPROC %d ahead of GlobalXmin %d", xid, pgprocno, + global_xmin); + } +} diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 6fc55751b5..e33bc65a89 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -657,7 +657,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, before_shmem_exit(ShutdownPostgres, 0); /* The autovacuum launcher is done here */ - if (IsAutoVacuumLauncherProcess()) + if (IsAutoVacuumLauncherProcess() || IsClusterMonitorProcess()) return; /* diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c index 64a88cb540..bc0a72ea93 100644 --- a/src/gtm/client/fe-protocol.c +++ b/src/gtm/client/fe-protocol.c @@ -512,14 +512,6 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) break; } - if (gtmpqGetnchar((char *)&result->gr_snapshot.sn_recent_global_xmin, - sizeof (GlobalTransactionId), conn)) - { - result->gr_status = GTM_RESULT_ERROR; - break; - } - - if (gtmpqGetInt((int *)&result->gr_snapshot.sn_xcnt, sizeof (int32), conn)) { @@ -739,6 +731,13 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) break; } result->gr_resdata.grd_node.node_name[result->gr_resdata.grd_node.len] = '\0'; + + if (result->gr_type == NODE_UNREGISTER_RESULT) + break; + + if (gtmpqGetInt((char *)&result->gr_resdata.grd_node.xmin, sizeof + (GlobalTransactionId), conn)) + result->gr_status = GTM_RESULT_ERROR; break; case NODE_LIST_RESULT: @@ -789,6 +788,28 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) } case BARRIER_RESULT: break; + + case REPORT_XMIN_RESULT: + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.reported_xmin, + sizeof (GlobalTransactionId), conn)) + { + result->gr_status = GTM_RESULT_ERROR; + break; + } + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.global_xmin, + sizeof (GlobalTransactionId), conn)) + { + result->gr_status = GTM_RESULT_ERROR; + break; + } + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_report_xmin.errcode, + sizeof (int), conn)) + { + result->gr_status = GTM_RESULT_ERROR; + break; + } + break; + default: printfGTMPQExpBuffer(&conn->errorMessage, "unexpected result type from server; result typr was \"%d\"\n", diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 5058e73b0e..1f1f6f50fd 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -28,7 +28,6 @@ #endif #include <time.h> - #include "gtm/gtm_c.h" #include "gtm/gtm_ip.h" @@ -40,6 +39,7 @@ #include "gtm/gtm_serialize.h" #include "gtm/register.h" #include "gtm/assert.h" +#include "pgxc/pgxc.h" extern bool Backup_synchronously; @@ -77,7 +77,9 @@ static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequ GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup); static int node_register_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, - char *node_name, char *datafolder, GTM_PGXCNodeStatus status, bool is_backup); + char *node_name, char *datafolder, + GTM_PGXCNodeStatus status, bool is_backup, + GlobalTransactionId *xmin); static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup); static int report_barrier_internal(GTM_Conn *conn, char *barrier_id, bool is_backup); /* @@ -1654,7 +1656,8 @@ int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, - char *datafolder) + char *datafolder, + GlobalTransactionId *xmin) { char host[1024]; int rc; @@ -1665,7 +1668,8 @@ int node_register(GTM_Conn *conn, return -1; } - return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, false); + return node_register_worker(conn, type, host, port, node_name, datafolder, + NODE_CONNECTED, false, xmin); } int node_register_internal(GTM_Conn *conn, @@ -1674,27 +1678,11 @@ int node_register_internal(GTM_Conn *conn, GTM_PGXCNodePort port, char *node_name, char *datafolder, - GTM_PGXCNodeStatus status) -{ - return node_register_worker(conn, type, host, port, node_name, datafolder, status, false); -} - -int bkup_node_register(GTM_Conn *conn, - GTM_PGXCNodeType type, - GTM_PGXCNodePort port, - char *node_name, - char *datafolder) + GTM_PGXCNodeStatus status, + GlobalTransactionId *xmin) { - char host[1024]; - int rc; - - node_get_local_addr(conn, host, sizeof(host), &rc); - if (rc != 0) - { - return -1; - } - - return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, true); + return node_register_worker(conn, type, host, port, node_name, datafolder, + status, false, xmin); } int bkup_node_register_internal(GTM_Conn *conn, @@ -1703,9 +1691,11 @@ int bkup_node_register_internal(GTM_Conn *conn, GTM_PGXCNodePort port, char *node_name, char *datafolder, - GTM_PGXCNodeStatus status) + GTM_PGXCNodeStatus status, + GlobalTransactionId xmin) { - return node_register_worker(conn, type, host, port, node_name, datafolder, status, true); + return node_register_worker(conn, type, host, port, node_name, datafolder, + status, true, &xmin); } static int node_register_worker(GTM_Conn *conn, @@ -1715,7 +1705,8 @@ static int node_register_worker(GTM_Conn *conn, char *node_name, char *datafolder, GTM_PGXCNodeStatus status, - bool is_backup) + bool is_backup, + GlobalTransactionId *xmin) { GTM_Result *res = NULL; time_t finish_time; @@ -1754,7 +1745,9 @@ static int node_register_worker(GTM_Conn *conn, /* Data Folder (var-len) */ gtmpqPutnchar(datafolder, strlen(datafolder), conn) || /* Node Status */ - gtmpqPutInt(status, sizeof(GTM_PGXCNodeStatus), conn)) + gtmpqPutInt(status, sizeof(GTM_PGXCNodeStatus), conn) || + /* Recent Xmin */ + gtmpqPutnchar((char *)xmin, sizeof (GlobalTransactionId), conn)) { goto send_failed; } @@ -1790,6 +1783,8 @@ static int node_register_worker(GTM_Conn *conn, { Assert(res->gr_resdata.grd_node.type == type); Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0)); + if (xmin) + *xmin = res->gr_resdata.grd_node.xmin; } return res->gr_status; @@ -2217,7 +2212,6 @@ snapshot_get_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out)); memcpy(xmin_out, &res->gr_snapshot.sn_xmin, sizeof(GlobalTransactionId)); memcpy(xmax_out, &res->gr_snapshot.sn_xmax, sizeof(GlobalTransactionId)); - memcpy(recent_global_xmin_out, &res->gr_snapshot.sn_recent_global_xmin, sizeof(GlobalTransactionId)); memcpy(xcnt_out, &res->gr_snapshot.sn_xcnt, sizeof(int32)); } @@ -2430,3 +2424,64 @@ send_failed: conn->result->gr_status = GTM_RESULT_COMM_ERROR; return -1; } + +int +report_global_xmin(GTM_Conn *conn, const char *node_name, + GTM_PGXCNodeType type, GlobalTransactionId *gxid, + GlobalTransactionId *global_xmin, + bool isIdle, int *errcode) +{ + GTM_Result *res = NULL; + time_t finish_time; + + if (gtmpqPutMsgStart('C', true, conn) || + gtmpqPutInt(MSG_REPORT_XMIN, sizeof (GTM_MessageType), conn) || + gtmpqPutnchar((char *)gxid, sizeof(GlobalTransactionId), conn) || + gtmpqPutc(isIdle, conn) || + gtmpqPutInt(type, sizeof (GTM_PGXCNodeType), conn) || + gtmpqPutInt(strlen(node_name), sizeof (GTM_StrLen), conn) || + gtmpqPutnchar(node_name, strlen(node_name), conn)) + { + goto send_failed; + } + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + { + goto send_failed; + } + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + { + goto send_failed; + } + + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + { + goto receive_failed; + } + + if ((res = GTMPQgetResult(conn)) == NULL) + { + goto receive_failed; + } + + if (res->gr_status == GTM_RESULT_OK) + { + *gxid = res->gr_resdata.grd_report_xmin.reported_xmin; + *global_xmin = res->gr_resdata.grd_report_xmin.global_xmin; + *errcode = res->gr_resdata.grd_report_xmin.errcode; + } + return res->gr_status; + +receive_failed: +send_failed: + conn->result = makeEmptyResultIfIsNull(conn->result); + conn->result->gr_status = GTM_RESULT_COMM_ERROR; + return -1; + +} + diff --git a/src/gtm/common/Makefile b/src/gtm/common/Makefile index 31e0c25ff9..cb58dc998e 100644 --- a/src/gtm/common/Makefile +++ b/src/gtm/common/Makefile @@ -23,7 +23,7 @@ LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq LIBS=-lpthread OBJS = gtm_opt_handler.o aset.o mcxt.o gtm_utils.o elog.o assert.o stringinfo.o gtm_lock.o \ - gtm_list.o gtm_serialize.o gtm_serialize_debug.o + gtm_list.o gtm_serialize.o gtm_serialize_debug.o gtm_time.o gtm_gxid.o all:all-lib diff --git a/src/gtm/common/gtm_gxid.c b/src/gtm/common/gtm_gxid.c new file mode 100644 index 0000000000..6debb3e259 --- /dev/null +++ b/src/gtm/common/gtm_gxid.c @@ -0,0 +1,67 @@ +#include "gtm/gtm_c.h" +#include "gtm/gtm_gxid.h" + +/* + * GlobalTransactionIdPrecedes --- is id1 logically < id2? + */ +bool +GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2) +{ + /* + * If either ID is a permanent XID then we can just do unsigned + * comparison. If both are normal, do a modulo-2^31 comparison. + */ + int32 diff; + + if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) + return (id1 < id2); + + diff = (int32) (id1 - id2); + return (diff < 0); +} + +/* + * GlobalTransactionIdPrecedesOrEquals --- is id1 logically <= id2? + */ +bool +GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2) +{ + int32 diff; + + if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) + return (id1 <= id2); + + diff = (int32) (id1 - id2); + return (diff <= 0); +} + +/* + * GlobalTransactionIdFollows --- is id1 logically > id2? + */ +bool +GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2) +{ + int32 diff; + + if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) + return (id1 > id2); + + diff = (int32) (id1 - id2); + return (diff > 0); +} + +/* + * GlobalTransactionIdFollowsOrEquals --- is id1 logically >= id2? + */ +bool +GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2) +{ + int32 diff; + + if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) + return (id1 >= id2); + + diff = (int32) (id1 - id2); + return (diff >= 0); +} + diff --git a/src/gtm/common/gtm_serialize.c b/src/gtm/common/gtm_serialize.c index caa1c1c0f2..cabdcb5941 100644 --- a/src/gtm/common/gtm_serialize.c +++ b/src/gtm/common/gtm_serialize.c @@ -86,10 +86,6 @@ gtm_serialize_snapshotdata(GTM_SnapshotData *data, char *buf, size_t buflen) memcpy(buf + len, &(data->sn_xmax), sizeof(GlobalTransactionId)); len += sizeof(GlobalTransactionId); - /* GTM_SnapshotData.sn_recent_global_xmin */ - memcpy(buf + len, &(data->sn_recent_global_xmin), sizeof(GlobalTransactionId)); - len += sizeof(GlobalTransactionId); - /* GTM_SnapshotData.sn_xcnt */ memcpy(buf + len, &(data->sn_xcnt), sizeof(uint32)); len += sizeof(uint32); @@ -130,10 +126,6 @@ gtm_deserialize_snapshotdata(GTM_SnapshotData *data, const char *buf, size_t buf memcpy(&(data->sn_xmax), buf + len, sizeof(GlobalTransactionId)); len += sizeof(GlobalTransactionId); - /* GTM_SnapshotData.sn_recent_global_xmin */ - memcpy(&(data->sn_recent_global_xmin), buf + len, sizeof(GlobalTransactionId)); - len += sizeof(GlobalTransactionId); - /* GTM_SnapshotData.sn_xcnt */ memcpy(&(data->sn_xcnt), buf + len, sizeof(uint32)); len += sizeof(uint32); @@ -209,7 +201,6 @@ gtm_serialize_transactioninfo(GTM_TransactionInfo *data, char *buf, size_t bufle int len = 0; char *buf2; int i; - int namelen; /* size check */ if (gtm_get_transactioninfo_size(data) > buflen) @@ -671,7 +662,10 @@ gtm_get_pgxcnodeinfo_size(GTM_PGXCNodeInfo *data) len += sizeof(GTM_PGXCNodeStatus); /* status */ - len += sizeof(uint32); /* max_sessions */ + len += sizeof(bool); /* excluded ?*/ + len += sizeof(GlobalTransactionId); /* xmin */ + len += sizeof(GTM_Timestamp); /* reported timestamp */ + len += sizeof(uint32); /* num_sessions */ if (data->num_sessions > 0) /* sessions */ len += (data->num_sessions * sizeof(GTM_PGXCSession)); @@ -762,6 +756,15 @@ gtm_serialize_pgxcnodeinfo(GTM_PGXCNodeInfo *data, char *buf, size_t buflen) memcpy(buf + len, &(data->status), sizeof(GTM_PGXCNodeStatus)); len += sizeof(GTM_PGXCNodeStatus); + memcpy(buf + len, &(data->excluded), sizeof(bool)); + len += sizeof(bool); + + memcpy(buf + len, &(data->reported_xmin), sizeof(GlobalTransactionId)); + len += sizeof(GlobalTransactionId); + + memcpy(buf + len, &(data->reported_xmin_time), sizeof(GTM_Timestamp)); + len += sizeof(GTM_Timestamp); + /* GTM_PGXCNodeInfo.sessions */ len_wk = data->max_sessions; memcpy(buf + len, &len_wk, sizeof(uint32)); @@ -901,6 +904,18 @@ gtm_deserialize_pgxcnodeinfo(GTM_PGXCNodeInfo *data, const char *buf, size_t buf memcpy(&(data->status), buf + len, sizeof(GTM_PGXCNodeStatus)); len += sizeof(GTM_PGXCNodeStatus); + /* GTM_PGXCNodeInfo.excluded */ + memcpy(&(data->excluded), buf + len, sizeof (bool)); + len += sizeof (bool); + + /* GTM_PGXCNodeInfo.reported_xmin */ + memcpy(&(data->reported_xmin), buf + len, sizeof (GlobalTransactionId)); + len += sizeof (GlobalTransactionId); + + /* GTM_PGXCNodeInfo.reported_xmin_time */ + memcpy(&(data->reported_xmin_time), buf + len, sizeof (GTM_Timestamp)); + len += sizeof (GTM_Timestamp); + /* GTM_PGXCNodeInfo.sessions */ memcpy(&len_wk, buf + len, sizeof(uint32)); len += sizeof(uint32); diff --git a/src/gtm/common/gtm_serialize_debug.c b/src/gtm/common/gtm_serialize_debug.c index a5d6e47425..d688211ff4 100644 --- a/src/gtm/common/gtm_serialize_debug.c +++ b/src/gtm/common/gtm_serialize_debug.c @@ -49,7 +49,6 @@ dump_transactioninfo_elog(GTM_TransactionInfo *txn) elog(LOG, " sn_xmin: %d", txn->gti_current_snapshot.sn_xmin); elog(LOG, " sn_xmax: %d", txn->gti_current_snapshot.sn_xmax); - elog(LOG, " sn_recent_global_xmin: %d", txn->gti_current_snapshot.sn_recent_global_xmin); elog(LOG, " sn_xcnt: %d", txn->gti_current_snapshot.sn_xcnt); /* Print all the GXIDs in snapshot */ diff --git a/src/gtm/common/gtm_time.c b/src/gtm/common/gtm_time.c new file mode 100644 index 0000000000..d121822743 --- /dev/null +++ b/src/gtm/common/gtm_time.c @@ -0,0 +1,96 @@ +/*------------------------------------------------------------------------- + * + * gtm_time.c + * Timestamp handling on GTM + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010-2012 Postgres-XC Development Group + * + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + +#include "gtm/gtm.h" +#include "gtm/gtm_c.h" +#include "gtm/gtm_time.h" +#include <time.h> +#include <sys/time.h> + +GTM_Timestamp +GTM_TimestampGetCurrent(void) +{ + struct timeval tp; + GTM_Timestamp result; + + gettimeofday(&tp, NULL); + + result = (GTM_Timestamp) tp.tv_sec - + ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + +#ifdef HAVE_INT64_TIMESTAMP + result = (result * USECS_PER_SEC) + tp.tv_usec; +#else + result = result + (tp.tv_usec / 1000000.0); +#endif + + return result; +} + +/* + * TimestampDifference -- convert the difference between two timestamps + * into integer seconds and microseconds + * + * Both inputs must be ordinary finite timestamps (in current usage, + * they'll be results from GTM_TimestampGetCurrent()). + * + * We expect start_time <= stop_time. If not, we return zeroes; for current + * callers there is no need to be tense about which way division rounds on + * negative inputs. + */ +void +GTM_TimestampDifference(GTM_Timestamp start_time, GTM_Timestamp stop_time, + long *secs, int *microsecs) +{ + GTM_Timestamp diff = stop_time - start_time; + + if (diff <= 0) + { + *secs = 0; + *microsecs = 0; + } + else + { +#ifdef HAVE_INT64_TIMESTAMP + *secs = (long) (diff / USECS_PER_SEC); + *microsecs = (int) (diff % USECS_PER_SEC); +#else + *secs = (long) diff; + *microsecs = (int) ((diff - *secs) * 1000000.0); +#endif + } +} + +/* + * GTM_TimestampDifferenceExceeds -- report whether the difference between two + * timestamps is >= a threshold (expressed in milliseconds) + * + * Both inputs must be ordinary finite timestamps (in current usage, + * they'll be results from GTM_TimestampDifferenceExceeds()). + */ +bool +GTM_TimestampDifferenceExceeds(GTM_Timestamp start_time, + GTM_Timestamp stop_time, + int msec) +{ + GTM_Timestamp diff = stop_time - start_time; + +#ifdef HAVE_INT64_TIMESTAMP + return (diff >= msec * INT64CONST(1000)); +#else + return (diff * 1000.0 >= msec); +#endif +} diff --git a/src/gtm/common/gtm_utils.c b/src/gtm/common/gtm_utils.c index 96c6e95b20..56d09f01ba 100644 --- a/src/gtm/common/gtm_utils.c +++ b/src/gtm/common/gtm_utils.c @@ -37,6 +37,7 @@ static struct enum_name message_name_tab[] = {MSG_NODE_UNREGISTER, "MSG_NODE_UNREGISTER"}, {MSG_BKUP_NODE_UNREGISTER, "MSG_BKUP_NODE_UNREGISTER"}, {MSG_REGISTER_SESSION, "MSG_REGISTER_SESSION"}, + {MSG_REPORT_XMIN, "MSG_REPORT_XMIN"}, {MSG_NODE_LIST, "MSG_NODE_LIST"}, {MSG_NODE_BEGIN_REPLICATION_INIT, "MSG_NODE_BEGIN_REPLICATION_INIT"}, {MSG_NODE_END_REPLICATION_INIT, "MSG_NODE_END_REPLICATION_INIT"}, @@ -138,6 +139,7 @@ static struct enum_name result_name_tab[] = {TXN_GET_STATUS_RESULT, "TXN_GET_STATUS_RESULT"}, {TXN_GET_ALL_PREPARED_RESULT, "TXN_GET_ALL_PREPARED_RESULT"}, {TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, "TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT"}, + {REPORT_XMIN_RESULT, "REPORT_XMIN_RESULT"}, {RESULT_TYPE_COUNT, "RESULT_TYPE_COUNT"}, {-1, NULL} }; diff --git a/src/gtm/main/Makefile b/src/gtm/main/Makefile index 3e76afa50f..208b2297c7 100644 --- a/src/gtm/main/Makefile +++ b/src/gtm/main/Makefile @@ -15,7 +15,7 @@ ifneq ($(PORTNAME), win32) override CFLAGS += $(PTHREAD_CFLAGS) endif -OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o gtm_standby.o gtm_opt.o gtm_backup.o +OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_standby.o gtm_opt.o gtm_backup.o OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a ../../port/libpgport.a diff --git a/src/gtm/main/gtm_snap.c b/src/gtm/main/gtm_snap.c index 9a9a78bfd0..f4ab3a127b 100644 --- a/src/gtm/main/gtm_snap.c +++ b/src/gtm/main/gtm_snap.c @@ -184,12 +184,10 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s if (GlobalTransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; - GTMTransactions.gt_recent_global_xmin = globalxmin; snapshot->sn_xmin = xmin; snapshot->sn_xmax = xmax; snapshot->sn_xcnt = count; - snapshot->sn_recent_global_xmin = globalxmin; /* * Now, before the proc array lock is released, set the xmin in the txninfo @@ -235,7 +233,6 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; - mysnap->sn_recent_global_xmin = snapshot->sn_recent_global_xmin; memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } @@ -257,7 +254,6 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; - mysnap->sn_recent_global_xmin = snapshot->sn_recent_global_xmin; memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } @@ -269,9 +265,9 @@ GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *s GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); - elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u:%u)", + elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)", snapshot->sn_xmin, snapshot->sn_xmax, - snapshot->sn_xcnt, snapshot->sn_recent_global_xmin); + snapshot->sn_xcnt); return snapshot; } @@ -338,7 +334,6 @@ ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid) pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); - pq_sendbytes(&buf, (char *)&snapshot->sn_recent_global_xmin, sizeof (GlobalTransactionId)); pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * snapshot->sn_xcnt); @@ -404,7 +399,6 @@ ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); - pq_sendbytes(&buf, (char *)&snapshot->sn_recent_global_xmin, sizeof (GlobalTransactionId)); pq_sendint(&buf, snapshot->sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * snapshot->sn_xcnt); diff --git a/src/gtm/main/gtm_standby.c b/src/gtm/main/gtm_standby.c index 6b30fd77bd..0455455323 100644 --- a/src/gtm/main/gtm_standby.c +++ b/src/gtm/main/gtm_standby.c @@ -164,8 +164,6 @@ gtm_standby_restore_gxid(void) txn.gt_transactions_array[i].gti_current_snapshot.sn_xmin; GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xmax = txn.gt_transactions_array[i].gti_current_snapshot.sn_xmax; - GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_recent_global_xmin = - txn.gt_transactions_array[i].gti_current_snapshot.sn_recent_global_xmin; GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xcnt = txn.gt_transactions_array[i].gti_current_snapshot.sn_xcnt; GTMTransactions.gt_transactions_array[handle].gti_current_snapshot.sn_xip = @@ -225,6 +223,7 @@ gtm_standby_restore_node(void) data[i].type, data[i].nodename, data[i].datafolder); if (Recovery_PGXCNodeRegister(data[i].type, data[i].nodename, data[i].port, data[i].proxyname, data[i].status, + &data[i].reported_xmin, data[i].ipaddress, data[i].datafolder, true, -1 /* dummy socket */) != 0) { @@ -265,7 +264,8 @@ gtm_standby_register_self(const char *node_name, int port, const char *datadir) standbyDataDir= (char *)datadir; rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber, - standbyNodeName, standbyDataDir, NODE_DISCONNECTED); + standbyNodeName, standbyDataDir, + NODE_DISCONNECTED, InvalidGlobalTransactionId); if (rc < 0) { elog(DEBUG1, "Failed to register a standby-GTM status."); @@ -297,7 +297,8 @@ gtm_standby_activate_self(void) } rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber, - standbyNodeName, standbyDataDir, NODE_CONNECTED); + standbyNodeName, standbyDataDir, + NODE_CONNECTED, InvalidGlobalTransactionId); if (rc < 0) { diff --git a/src/gtm/main/gtm_time.c b/src/gtm/main/gtm_time.c deleted file mode 100644 index b2c252446c..0000000000 --- a/src/gtm/main/gtm_time.c +++ /dev/null @@ -1,41 +0,0 @@ -/*------------------------------------------------------------------------- - * - * gtm_time.c - * Timestamp handling on GTM - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * Portions Copyright (c) 2010-2012 Postgres-XC Development Group - * - * - * IDENTIFICATION - * $PostgreSQL$ - * - *------------------------------------------------------------------------- - */ - -#include "gtm/gtm.h" -#include "gtm/gtm_c.h" -#include "gtm/gtm_time.h" -#include <time.h> -#include <sys/time.h> - -GTM_Timestamp -GTM_TimestampGetCurrent(void) -{ - struct timeval tp; - GTM_Timestamp result; - - gettimeofday(&tp, NULL); - - result = (GTM_Timestamp) tp.tv_sec - - ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); - -#ifdef HAVE_INT64_TIMESTAMP - result = (result * USECS_PER_SEC) + tp.tv_usec; -#else - result = result + (tp.tv_usec / 1000000.0); -#endif - - return result; -} diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c index 249c74d189..7514f95359 100644 --- a/src/gtm/main/gtm_txn.c +++ b/src/gtm/main/gtm_txn.c @@ -477,70 +477,6 @@ GlobalTransactionIdDidAbort(GlobalTransactionId transactionId) return false; } -/* - * GlobalTransactionIdPrecedes --- is id1 logically < id2? - */ -bool -GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2) -{ - /* - * If either ID is a permanent XID then we can just do unsigned - * comparison. If both are normal, do a modulo-2^31 comparison. - */ - int32 diff; - - if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) - return (id1 < id2); - - diff = (int32) (id1 - id2); - return (diff < 0); -} - -/* - * GlobalTransactionIdPrecedesOrEquals --- is id1 logically <= id2? - */ -bool -GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2) -{ - int32 diff; - - if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) - return (id1 <= id2); - - diff = (int32) (id1 - id2); - return (diff <= 0); -} - -/* - * GlobalTransactionIdFollows --- is id1 logically > id2? - */ -bool -GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2) -{ - int32 diff; - - if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) - return (id1 > id2); - - diff = (int32) (id1 - id2); - return (diff > 0); -} - -/* - * GlobalTransactionIdFollowsOrEquals --- is id1 logically >= id2? - */ -bool -GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2) -{ - int32 diff; - - if (!GlobalTransactionIdIsNormal(id1) || !GlobalTransactionIdIsNormal(id2)) - return (id1 >= id2); - - diff = (int32) (id1 - id2); - return (diff >= 0); -} - /* * Set that the transaction is doing vacuum @@ -2684,6 +2620,60 @@ ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message) return; } +void +ProcessReportXminCommand(Port *myport, StringInfo message, bool is_backup) +{ + StringInfoData buf; + GlobalTransactionId gxid; + GTM_StrLen nodelen; + char node_name[NI_MAXHOST]; + GTM_PGXCNodeType type; + GlobalTransactionId global_xmin; + int errcode; + bool remoteIdle; + + const char *data = pq_getmsgbytes(message, sizeof (gxid)); + + if (data == NULL) + ereport(ERROR, + (EPROTO, + errmsg("Message does not contain valid GXID"))); + memcpy(&gxid, data, sizeof (gxid)); + + /* Read number of running transactions */ + remoteIdle = pq_getmsgbyte(message); + + /* Read Node Type */ + type = pq_getmsgint(message, sizeof (GTM_PGXCNodeType)); + + /* get node name */ + nodelen = pq_getmsgint(message, sizeof (GTM_StrLen)); + memcpy(node_name, (char *)pq_getmsgbytes(message, nodelen), nodelen); + node_name[nodelen] = '\0'; + pq_getmsgend(message); + + global_xmin = GTM_HandleGlobalXmin(type, node_name, &gxid, remoteIdle, + &errcode); + + { + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, REPORT_XMIN_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId)); + pq_sendbytes(&buf, (char *)&global_xmin, sizeof (GlobalTransactionId)); + pq_sendbytes(&buf, (char *)&errcode, sizeof (errcode)); + pq_endmessage(myport, &buf); + pq_flush(myport); + } +} /* * Mark GTM as shutting down. This point onwards no new GXID are issued to @@ -2754,6 +2744,12 @@ void GTM_WriteRestorePointXid(FILE *f) fprintf(f, "%u\n", GTMTransactions.gt_backedUpXid); } +GlobalTransactionId +GTM_GetLatestCompletedXID(void) +{ + return GTMTransactions.gt_latestCompletedXid; +} + /* * TODO */ diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c index a995fe9beb..0f3d89f08c 100644 --- a/src/gtm/main/main.c +++ b/src/gtm/main/main.c @@ -233,7 +233,7 @@ BaseInit() GTM_InitTxnManager(); GTM_InitSeqManager(); - + GTM_InitNodeManager(); } static void @@ -1337,6 +1337,10 @@ ProcessCommand(Port *myport, StringInfo input_message) case MSG_TXN_GET_GID_DATA: case MSG_TXN_GET_NEXT_GXID: case MSG_TXN_GXID_LIST: +#ifdef XCP + case MSG_REPORT_XMIN: + case MSG_BKUP_REPORT_XMIN: +#endif ProcessTransactionCommand(myport, mtype, input_message); break; @@ -1667,6 +1671,14 @@ ProcessTransactionCommand(Port *myport, GTM_MessageType mtype, StringInfo messag ProcessGXIDListCommand(myport, message); break; + case MSG_REPORT_XMIN: + ProcessReportXminCommand(myport, message, false); + break; + + case MSG_BKUP_REPORT_XMIN: + ProcessReportXminCommand(myport, message, true); + break; + default: Assert(0); /* Shouldn't come here.. keep compiler quite */ } diff --git a/src/gtm/proxy/Makefile b/src/gtm/proxy/Makefile index c1ab2018d8..fe350113a9 100644 --- a/src/gtm/proxy/Makefile +++ b/src/gtm/proxy/Makefile @@ -17,7 +17,7 @@ endif OBJS=proxy_main.o proxy_thread.o proxy_utils.o gtm_proxy_opt.o -OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a +OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../client/libgtmclient.a ../common/libgtm.a LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c index 041c5be392..9765a29e3b 100644 --- a/src/gtm/proxy/proxy_main.c +++ b/src/gtm/proxy/proxy_main.c @@ -155,8 +155,6 @@ static void ProcessCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, StringInfo input_message); static GTM_Conn *HandleGTMError(GTM_Conn *gtm_conn); static GTM_Conn *HandlePostCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn); -static void ProcessPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo, - GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message); static void ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message); static void ProcessSnapshotCommand(GTMProxy_ConnectionInfo *conninfo, @@ -274,9 +272,6 @@ BaseInit() GTM_RWLockInit(&ReconnectControlLock); - /* Save Node Register File in register.c */ - Recovery_SaveRegisterFileName(GTMProxyDataDir); - /* Register Proxy on GTM */ RegisterProxy(false); @@ -524,9 +519,6 @@ GTMProxy_SigleHandler(int signal) /* Unregister Proxy on GTM */ UnregisterProxy(); - /* Rewrite Register Information (clean up unregister records) */ - Recovery_SaveRegisterInfo(); - /* * XXX We should do a clean shutdown here. */ @@ -1595,13 +1587,10 @@ ProcessCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, case MSG_BARRIER: case MSG_TXN_COMMIT: case MSG_REGISTER_SESSION: - GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, input_message); - break; - - + case MSG_REPORT_XMIN: case MSG_NODE_REGISTER: case MSG_NODE_UNREGISTER: - ProcessPGXCNodeCommand(conninfo, gtm_conn, mtype, input_message); + GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, input_message); break; case MSG_TXN_BEGIN: @@ -1740,6 +1729,7 @@ IsProxiedMessage(GTM_MessageType mtype) case MSG_NODE_REGISTER: case MSG_NODE_UNREGISTER: case MSG_REGISTER_SESSION: + case MSG_REPORT_XMIN: case MSG_SNAPSHOT_GXID_GET: case MSG_SEQUENCE_INIT: case MSG_SEQUENCE_GET_CURRENT: @@ -1916,7 +1906,6 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo, pq_sendbytes(&buf, (char *)&status, sizeof (status)); pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_xmax, sizeof (GlobalTransactionId)); - pq_sendbytes(&buf, (char *)&res->gr_snapshot.sn_recent_global_xmin, sizeof (GlobalTransactionId)); pq_sendint(&buf, res->gr_snapshot.sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)res->gr_snapshot.sn_xip, sizeof(GlobalTransactionId) * res->gr_snapshot.sn_xcnt); @@ -1943,6 +1932,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo, case MSG_NODE_REGISTER: case MSG_NODE_UNREGISTER: case MSG_REGISTER_SESSION: + case MSG_REPORT_XMIN: case MSG_SNAPSHOT_GXID_GET: case MSG_SEQUENCE_INIT: case MSG_SEQUENCE_GET_CURRENT: @@ -2122,151 +2112,6 @@ ReadCommand(GTMProxy_ConnectionInfo *conninfo, StringInfo inBuf) } static void -ProcessPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, - GTM_MessageType mtype, StringInfo message) -{ - GTMProxy_CommandData cmd_data; - - /* - * For Node registering, proxy number is also saved and registered on GTM with node. - * So get and modify the register message in consequence. - */ - switch(mtype) - { - case MSG_NODE_REGISTER: - { - int len; - MemoryContext oldContext; - char remote_host[NI_MAXHOST]; - char remote_port[NI_MAXSERV]; - - /* Get Remote IP and port from Conn structure to register */ - remote_host[0] = '\0'; - remote_port[0] = '\0'; - - if (gtm_getnameinfo_all(&conninfo->con_port->raddr.addr, - conninfo->con_port->raddr.salen, - remote_host, sizeof(remote_host), - remote_port, sizeof(remote_port), - NI_NUMERICSERV)) - { - int ret = gtm_getnameinfo_all(&conninfo->con_port->raddr.addr, - conninfo->con_port->raddr.salen, - remote_host, sizeof(remote_host), - remote_port, sizeof(remote_port), - NI_NUMERICHOST | NI_NUMERICSERV); - - if (ret) - ereport(WARNING, - (errmsg_internal("gtm_getnameinfo_all() failed"))); - } - - /* Get the node type */ - memcpy(&cmd_data.cd_reg.type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), sizeof (GTM_PGXCNodeType)); - - /* Then obtain the node name */ - len = pq_getmsgint(message, sizeof(GTM_StrLen)); - cmd_data.cd_reg.nodename = palloc(len + 1); - memcpy(cmd_data.cd_reg.nodename, (char *)pq_getmsgbytes(message, len), len); - cmd_data.cd_reg.nodename[len] = '\0'; - - /* - * Now we have to waste the following host information. It is taken from - * the address field in the conn. - */ - len = pq_getmsgint(message, sizeof(GTM_StrLen)); - cmd_data.cd_reg.ipaddress = palloc(len + 1); - memcpy(cmd_data.cd_reg.ipaddress, (char *)pq_getmsgbytes(message, len), len); - cmd_data.cd_reg.ipaddress[len] = '\0'; - - /* Then the next is the port number */ - memcpy(&cmd_data.cd_reg.port, - pq_getmsgbytes(message, - sizeof (GTM_PGXCNodePort)), - sizeof (GTM_PGXCNodePort)); - - /* Proxy name */ - len = pq_getmsgint(message, sizeof(GTM_StrLen)); - cmd_data.cd_reg.gtm_proxy_nodename = palloc(len + 1); - memcpy(cmd_data.cd_reg.gtm_proxy_nodename, (char *)pq_getmsgbytes(message, len), len); - cmd_data.cd_reg.gtm_proxy_nodename[len] = '\0'; - - /* get data folder data */ - len = pq_getmsgint(message, sizeof (int)); - cmd_data.cd_reg.datafolder = palloc(len + 1); - memcpy(cmd_data.cd_reg.datafolder, (char *)pq_getmsgbytes(message, len), len); - cmd_data.cd_reg.datafolder[len] = '\0'; - - /* Now we have one more data to waste, "status" */ - cmd_data.cd_reg.status = pq_getmsgint(message, sizeof(GTM_PGXCNodeStatus)); - pq_getmsgend(message); - - /* Copy also remote host address in data to be proxied */ - cmd_data.cd_reg.ipaddress = (char *) palloc(strlen(remote_host) + 1); - memcpy(cmd_data.cd_reg.ipaddress, remote_host, strlen(remote_host)); - cmd_data.cd_reg.ipaddress[strlen(remote_host)] = '\0'; - - /* Registering has to be saved where it can be seen by all the threads */ - oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - - /* Register Node also on Proxy */ - if (Recovery_PGXCNodeRegister(cmd_data.cd_reg.type, - cmd_data.cd_reg.nodename, - cmd_data.cd_reg.port, - GTMProxyNodeName, - NODE_CONNECTED, - remote_host, - cmd_data.cd_reg.datafolder, - false, - conninfo->con_port->sock)) - { - ereport(ERROR, - (EINVAL, - errmsg("Failed to Register node"))); - } - - MemoryContextSwitchTo(oldContext); - - GTMProxy_ProxyPGXCNodeCommand(conninfo, gtm_conn, mtype, cmd_data); - break; - } - case MSG_NODE_UNREGISTER: - { - int len; - MemoryContext oldContext; - char *nodename; - - memcpy(&cmd_data.cd_reg.type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), sizeof (GTM_PGXCNodeType)); - len = pq_getmsgint(message, sizeof(GTM_StrLen)); - nodename = palloc(len + 1); - memcpy(nodename, pq_getmsgbytes(message, len), len); - nodename[len] = '\0'; /* Need null-terminate */ - cmd_data.cd_reg.nodename = nodename; - pq_getmsgend(message); - - /* Unregistering has to be saved in a place where it can be seen by all the threads */ - oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - - /* - * Unregister node. Ignore any error here, otherwise we enter - * endless loop trying to execute command again and again - */ - Recovery_PGXCNodeUnregister(cmd_data.cd_reg.type, - cmd_data.cd_reg.nodename, - false, - conninfo->con_port->sock); - MemoryContextSwitchTo(oldContext); - - GTMProxy_ProxyPGXCNodeCommand(conninfo, gtm_conn, mtype, cmd_data); - break; - } - default: - Assert(0); /* Shouldn't come here.. Keep compiler quiet */ - } - return; -} - -static void ProcessTransactionCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message) { @@ -2394,86 +2239,6 @@ GTMProxy_ProxyCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn, /* - * Proxy the incoming message to the GTM server after adding our own identifier - * to it. Add also in the registration message the GTM Proxy number and rebuilt message. - */ -static void GTMProxy_ProxyPGXCNodeCommand(GTMProxy_ConnectionInfo *conninfo,GTM_Conn *gtm_conn, GTM_MessageType mtype, GTMProxy_CommandData cmd_data) -{ - GTMProxy_CommandInfo *cmdinfo; - GTMProxy_ThreadInfo *thrinfo = GetMyThreadInfo; - GTM_ProxyMsgHeader proxyhdr; - - proxyhdr.ph_conid = conninfo->con_id; - - switch(mtype) - { - case MSG_NODE_REGISTER: - /* Rebuild the message */ - if (gtmpqPutMsgStart('C', true, gtm_conn) || - /* GTM Proxy Header */ - gtmpqPutnchar((char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader), gtm_conn) || - /* Message Type */ - gtmpqPutInt(MSG_NODE_REGISTER, sizeof (GTM_MessageType), gtm_conn) || - /* Node Type to Register */ - gtmpqPutnchar((char *)&cmd_data.cd_reg.type, sizeof(GTM_PGXCNodeType), gtm_conn) || - /* Node Name (length) */ - gtmpqPutInt(strlen(cmd_data.cd_reg.nodename), sizeof (GTM_StrLen), gtm_conn) || - /* Node Name (var-len) */ - gtmpqPutnchar(cmd_data.cd_reg.nodename, strlen(cmd_data.cd_reg.nodename), gtm_conn) || - /* Host Name (length) */ - gtmpqPutInt(strlen(cmd_data.cd_reg.ipaddress), sizeof (GTM_StrLen), gtm_conn) || - /* Host Name (var-len) */ - gtmpqPutnchar(cmd_data.cd_reg.ipaddress, strlen(cmd_data.cd_reg.ipaddress), gtm_conn) || - /* Port Number */ - gtmpqPutnchar((char *)&cmd_data.cd_reg.port, sizeof(GTM_PGXCNodePort), gtm_conn) || - /* Proxy Name (empty string if connected to GTM directly) */ - gtmpqPutInt(strlen(cmd_data.cd_reg.gtm_proxy_nodename), 4, gtm_conn) || - /* Proxy Name name (var-len) */ - gtmpqPutnchar(cmd_data.cd_reg.gtm_proxy_nodename, strlen(cmd_data.cd_reg.gtm_proxy_nodename), gtm_conn) || - /* Data Folder length */ - gtmpqPutInt(strlen(cmd_data.cd_reg.datafolder), 4, gtm_conn) || - /* Data folder name (var-len) */ - gtmpqPutnchar(cmd_data.cd_reg.datafolder, strlen(cmd_data.cd_reg.datafolder), gtm_conn) || - /* Node Status */ - gtmpqPutInt(cmd_data.cd_reg.status, sizeof(GTM_PGXCNodeStatus), gtm_conn)) - - elog(ERROR, "Error proxing data"); - break; - - case MSG_NODE_UNREGISTER: - if (gtmpqPutMsgStart('C', true, gtm_conn) || - gtmpqPutnchar((char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader), gtm_conn) || - gtmpqPutInt(MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), gtm_conn) || - gtmpqPutnchar((char *)&cmd_data.cd_reg.type, sizeof(GTM_PGXCNodeType), gtm_conn) || - /* Node Name (length) */ - gtmpqPutInt(strlen(cmd_data.cd_reg.nodename), sizeof (GTM_StrLen), gtm_conn) || - /* Node Name (var-len) */ - gtmpqPutnchar(cmd_data.cd_reg.nodename, strlen(cmd_data.cd_reg.nodename), gtm_conn)) - elog(ERROR, "Error proxing data"); - break; - - default: - Assert(0); /* Shouldn't come here.. Keep compiler quiet */ - } - - /* - * Add the message to the pending command list - */ - cmdinfo = palloc0(sizeof (GTMProxy_CommandInfo)); - cmdinfo->ci_mtype = mtype; - cmdinfo->ci_conn = conninfo; - cmdinfo->ci_res_index = 0; - thrinfo->thr_processed_commands = gtm_lappend(thrinfo->thr_processed_commands, cmdinfo); - - /* Finish the message. */ - if (gtmpqPutMsgEnd(gtm_conn)) - elog(ERROR, "Error finishing the message"); - - return; -} - - -/* * Record the incoming message as per its type. After all messages of this type * are collected, they will be sent in a single message to the GTM server. */ @@ -2576,9 +2341,6 @@ GTMProxy_HandleDisconnect(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn) GTM_ProxyMsgHeader proxyhdr; int namelen; - /* Mark node as disconnected if it is a postmaster backend */ - Recovery_PGXCNodeDisconnect(conninfo->con_port); - proxyhdr.ph_conid = conninfo->con_id; /* Start the message. */ if (gtmpqPutMsgStart('C', true, gtm_conn) || @@ -3200,6 +2962,7 @@ RegisterProxy(bool is_reconnect) char proxyname[] = ""; time_t finish_time; MemoryContext old_mcxt = NULL; + GlobalTransactionId xmin = InvalidGlobalTransactionId; if (is_reconnect) { @@ -3240,7 +3003,8 @@ RegisterProxy(bool is_reconnect) gtmpqPutnchar(proxyname, (int)strlen(proxyname), master_conn) || gtmpqPutInt((int)strlen(GTMProxyDataDir), 4, master_conn) || gtmpqPutnchar(GTMProxyDataDir, strlen(GTMProxyDataDir), master_conn)|| - gtmpqPutInt(NODE_CONNECTED, sizeof(GTM_PGXCNodeStatus), master_conn)) + gtmpqPutInt(NODE_CONNECTED, sizeof(GTM_PGXCNodeStatus), master_conn) || + gtmpqPutnchar((char *)&xmin, sizeof (GlobalTransactionId), master_conn)) goto failed; /* Finish the message. */ diff --git a/src/gtm/recovery/register_common.c b/src/gtm/recovery/register_common.c index 50c0ee3538..1cbdc2e104 100644 --- a/src/gtm/recovery/register_common.c +++ b/src/gtm/recovery/register_common.c @@ -29,6 +29,8 @@ #include "gtm/gtm_client.h" #include "gtm/gtm_serialize.h" #include "gtm/gtm_standby.h" +#include "gtm/gtm_time.h" +#include "gtm/gtm_txn.h" #include "gtm/libpq.h" #include "gtm/libpq-int.h" #include "gtm/pqformat.h" @@ -45,7 +47,6 @@ typedef struct GTM_NodeInfoHashBucket { gtm_List *nhb_list; - GTM_RWLock nhb_lock; } GTM_PGXCNodeInfoHashBucket; static char GTMPGXCNodeFile[GTM_NODE_FILE_MAX_PATH]; @@ -53,11 +54,16 @@ static char GTMPGXCNodeFile[GTM_NODE_FILE_MAX_PATH]; /* Lock access of record file when necessary */ static GTM_RWLock RegisterFileLock; +/* Lock to control registration/unregistration of nodes */ +static GTM_RWLock PGXCNodesLock; + static int NodeRegisterMagic = 0xeaeaeaea; static int NodeUnregisterMagic = 0xebebebeb; static int NodeEndMagic = 0xefefefef; static GTM_PGXCNodeInfoHashBucket GTM_PGXCNodes[NODE_HASH_TABLE_SIZE]; +static GlobalTransactionId GTM_GlobalXmin = FirstNormalGlobalTransactionId; +static GTM_Timestamp GTM_GlobalXminComputedTime; static GTM_PGXCNodeInfo *pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name); static uint32 pgxcnode_gethash(char *nodename); @@ -76,12 +82,11 @@ pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen) int node = 0; int i; + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ); + for (i = 0; i < NODE_HASH_TABLE_SIZE; i++) { bucket = >M_PGXCNodes[i]; - - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ); - gtm_foreach(elem, bucket->nhb_list) { GTM_PGXCNodeInfo *curr_nodeinfo = NULL; @@ -96,9 +101,8 @@ pgxcnode_get_all(GTM_PGXCNodeInfo **data, size_t maxlen) if (node == maxlen) break; } - - GTM_RWLockRelease(&bucket->nhb_lock); } + GTM_RWLockRelease(&PGXCNodesLock); return node; } @@ -111,12 +115,10 @@ pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t max int node = 0; int i; + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ); for (i = 0; i < NODE_HASH_TABLE_SIZE; i++) { bucket = >M_PGXCNodes[i]; - - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ); - gtm_foreach(elem, bucket->nhb_list) { GTM_PGXCNodeInfo *cur = NULL; @@ -133,9 +135,8 @@ pgxcnode_find_by_type(GTM_PGXCNodeType type, GTM_PGXCNodeInfo **data, size_t max if (node == maxlen) break; } - - GTM_RWLockRelease(&bucket->nhb_lock); } + GTM_RWLockRelease(&PGXCNodesLock); return node; } @@ -153,8 +154,7 @@ pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name) bucket = >M_PGXCNodes[hash]; - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ); - + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ); gtm_foreach(elem, bucket->nhb_list) { curr_nodeinfo = (GTM_PGXCNodeInfo *) gtm_lfirst(elem); @@ -164,7 +164,7 @@ pgxcnode_find_info(GTM_PGXCNodeType type, char *node_name) curr_nodeinfo = NULL; } - GTM_RWLockRelease(&bucket->nhb_lock); + GTM_RWLockRelease(&PGXCNodesLock); return curr_nodeinfo; } @@ -213,13 +213,13 @@ pgxcnode_remove_info(GTM_PGXCNodeInfo *nodeinfo) bucket = >M_PGXCNodes[hash]; - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_WRITE); + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE); GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_WRITE); bucket->nhb_list = gtm_list_delete(bucket->nhb_list, nodeinfo); GTM_RWLockRelease(&nodeinfo->node_lock); - GTM_RWLockRelease(&bucket->nhb_lock); + GTM_RWLockRelease(&PGXCNodesLock); return 0; } @@ -236,7 +236,12 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo) bucket = >M_PGXCNodes[hash]; - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_WRITE); + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE); + + elog(LOG, "Nodeinfo->reported_xmin - %d:%d", nodeinfo->reported_xmin, + GTM_GlobalXmin); + if (!GlobalTransactionIdIsValid(nodeinfo->reported_xmin)) + nodeinfo->reported_xmin = GTM_GlobalXmin; gtm_foreach(elem, bucket->nhb_list) { @@ -249,7 +254,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo) { if (curr_nodeinfo->status == NODE_CONNECTED) { - GTM_RWLockRelease(&bucket->nhb_lock); + GTM_RWLockRelease(&PGXCNodesLock); ereport(LOG, (EEXIST, errmsg("Node with the given ID number already exists - %s %d:%d", @@ -296,7 +301,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo) /* Set socket number with the new one */ curr_nodeinfo->socket = nodeinfo->socket; - GTM_RWLockRelease(&bucket->nhb_lock); + GTM_RWLockRelease(&PGXCNodesLock); return 0; } } @@ -306,7 +311,7 @@ pgxcnode_add_info(GTM_PGXCNodeInfo *nodeinfo) * Safe to add the structure to the list */ bucket->nhb_list = gtm_lappend(bucket->nhb_list, nodeinfo); - GTM_RWLockRelease(&bucket->nhb_lock); + GTM_RWLockRelease(&PGXCNodesLock); return 0; } @@ -380,6 +385,7 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *proxyname, GTM_PGXCNodeStatus status, + GlobalTransactionId *xmin, char *ipaddress, char *datafolder, bool in_recovery, @@ -408,6 +414,8 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, nodeinfo->ipaddress = pgxcnode_copy_char(ipaddress); nodeinfo->status = status; nodeinfo->socket = socket; + nodeinfo->reported_xmin = *xmin; + nodeinfo->reported_xmin_time = GTM_TimestampGetCurrent(); elog(DEBUG1, "Recovery_PGXCNodeRegister Request info: type=%d, nodename=%s, port=%d," \ "datafolder=%s, ipaddress=%s, status=%d", @@ -427,6 +435,9 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, if (!in_recovery && errcode == 0) Recovery_RecordRegisterInfo(nodeinfo, true); + if (xmin) + *xmin = nodeinfo->reported_xmin; + return errcode; } @@ -460,12 +471,11 @@ Recovery_SaveRegisterInfo(void) return; } + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ); for (hash = 0; hash < NODE_HASH_TABLE_SIZE; hash++) { bucket = >M_PGXCNodes[hash]; - GTM_RWLockAcquire(&bucket->nhb_lock, GTM_LOCKMODE_READ); - /* Write one by one information about registered nodes */ gtm_foreach(elem, bucket->nhb_list) { @@ -536,9 +546,8 @@ Recovery_SaveRegisterInfo(void) GTM_RWLockRelease(&nodeinfo->node_lock); } - - GTM_RWLockRelease(&bucket->nhb_lock); } + GTM_RWLockRelease(&PGXCNodesLock); close(ctlfd); @@ -745,11 +754,20 @@ Recovery_PGXCNodeRegisterCoordProcess(char *coord_node, int coord_procid, */ nodeinfo = pgxcnode_find_info(GTM_NODE_COORDINATOR, coord_node); - if (nodeinfo == NULL) + while (nodeinfo == NULL) { - if (Recovery_PGXCNodeRegister(GTM_NODE_COORDINATOR, coord_node, 0, NULL, - NODE_CONNECTED, NULL, NULL, false, 0)) - return 0; + GlobalTransactionId xmin = InvalidGlobalTransactionId; + int errcode = Recovery_PGXCNodeRegister(GTM_NODE_COORDINATOR, coord_node, 0, NULL, + NODE_CONNECTED, + &xmin, + NULL, NULL, false, 0); + + /* + * If another thread registers before we get a chance, just look for + * the nodeinfo again + */ + if (errcode != 0 && errcode != EEXIST) + return 0; nodeinfo = pgxcnode_find_info(GTM_NODE_COORDINATOR, coord_node); } @@ -887,3 +905,320 @@ retry: elog(DEBUG1, "MSG_BACKEND_DISCONNECT rc=%d done.", _rc); } } + +void +GTM_InitNodeManager(void) +{ + int ii; + + for (ii = 0; ii < NODE_HASH_TABLE_SIZE; ii++) + { + GTM_PGXCNodes[ii].nhb_list = gtm_NIL; + } + + GTM_RWLockInit(&RegisterFileLock); + GTM_RWLockInit(&PGXCNodesLock); +} + +/* + * Set to 120 seconds, but should be a few multiple for cluster monitor naptime + */ +#define GTM_REPORT_XMIN_DELAY_THRESHOLD (120 * 1000) + +GlobalTransactionId +GTM_HandleGlobalXmin(GTM_PGXCNodeType type, char *node_name, + GlobalTransactionId *reported_xmin, bool remoteIdle, int *errcode) +{ + GTM_PGXCNodeInfo *all_nodes[MAX_NODES]; + int num_nodes; + GTM_Timestamp current_time; + GTM_PGXCNodeInfo *mynodeinfo; + int ii; + GlobalTransactionId global_xmin; + GlobalTransactionId non_idle_global_xmin; + GlobalTransactionId idle_global_xmin; + bool excludeSelf = false; + + *errcode = 0; + + elog(LOG, "node_name: %s, remoteIdle: %d, reported_xmin: %d, global_xmin: %d", + node_name, remoteIdle, *reported_xmin, + GTM_GlobalXmin); + + /* + * Hold the PGXCNodesLock in READ mode until we are done with the + * GlobalXmin calculation. We don't want any new node to join the cluster, + * but its OK for other nodes to report and do these computation in + * parallel. If the other guy beats us and advances the GlobalXmin beyond + * what we compute, we accept that calculation + */ + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_READ); + + mynodeinfo = pgxcnode_find_info(type, node_name); + if (mynodeinfo == NULL) + { + *errcode = GTM_ERRCODE_NODE_NOT_REGISTERED; + GTM_RWLockRelease(&PGXCNodesLock); + elog(LOG, "GTM_ERRCODE_NODE_NOT_REGISTERED - node_name %s", node_name); + return InvalidGlobalTransactionId; + } + + GTM_RWLockAcquire(&mynodeinfo->node_lock, GTM_LOCKMODE_WRITE); + + /* + * If we were excluded from the GlobalXmin calculation because we failed to + * report our status for GTM_REPORT_XMIN_DELAY_THRESHOLD seconds, we can + * only join the cluster back iff the GlobalXmin hasn't advanced beyond + * what we'd last reported. Otherwise its possible that some nodes are way + * ahead of us. So we must give up and restart all over again (this is done + * via PANIC in Cluster Monitor process on the remote side + * + * The GTM_REPORT_XMIN_DELAY_THRESHOLD is of many order higher than the + * naptime used by Cluster Monitor. So unless there was a network outage or + * the remote node got serious busy such that the Cluster Monitor did not + * get opportunity to report xmin in a timely fashion, we shouldn't get + * into this situation often. + * + * The exception to this rule is that if the remote node is idle, then we + * actually ignore the xmin reported by it and instead calculate a new xmin + * for it and send it back in respone. The remote node will still done + * final sanity check and either accept that xmin or kill itself via PANIC + * mechanism. + */ + if ((mynodeinfo->excluded) && + GlobalTransactionIdPrecedes(mynodeinfo->reported_xmin, + GTM_GlobalXmin) && !remoteIdle) + { + *errcode = GTM_ERRCODE_NODE_EXCLUDED; + GTM_RWLockRelease(&mynodeinfo->node_lock); + GTM_RWLockRelease(&PGXCNodesLock); + elog(LOG, "GTM_ERRCODE_NODE_EXCLUDED - node_name %s", node_name); + return InvalidGlobalTransactionId; + } + + /* + * The remote node must not report a xmin which precedes the xmin it had + * reported in the past. If it ever happens, send an error back and let the + * remote node restart itself + */ + if (!remoteIdle && GlobalTransactionIdPrecedes(*reported_xmin, mynodeinfo->reported_xmin)) + { + *errcode = GTM_ERRCODE_TOO_OLD_XMIN; + GTM_RWLockRelease(&mynodeinfo->node_lock); + GTM_RWLockRelease(&PGXCNodesLock); + elog(LOG, "GTM_ERRCODE_TOO_OLD_XMIN - node_name %s", node_name); + return InvalidGlobalTransactionId; + } + + elog(DEBUG1, "node_name: %s, remoteIdle: %d, reported_xmin: %d, nodeinfo->reported_xmin: %d", + mynodeinfo->nodename, remoteIdle, *reported_xmin, + mynodeinfo->reported_xmin); + + /* + * If the remote node is idle, there is a danger that it may keep reporting + * a very old xmin (usually capped by latestCompletedXid). To handle such + * cases, which can be quite common in a large cluster, we check if the + * remote node has reported idle status and the reported xmin is same as + * what it reported in the last cycle and mark such node as "idle". + * Xmin reported by such a node is ignored and we compute xmin for it + * locally, here on the GTM. + * + * There are two strategies we follow: + * + * 1. We compute the lower bound of xmins reported by all non-idle remote + * nodes and assign that to this guy. This assumes that there is zero + * chance that a currently active (non-idle) node will send something to + * this guy which is older than the xmin computed + * + * 2. If all nodes are reporting their status as idle, we compute the lower + * bound of xmins reported by all idle nodes. This guarantees that the + * GlobalXmin can advance to a reasonable point even when all nodes have + * turned idle. + * + * In any case, the remote node will do its own sanity check before + * accepting the xmin computed by us and bail out if it doesn't agree with + * that. + */ + if (remoteIdle && + GlobalTransactionIdEquals(mynodeinfo->reported_xmin, + *reported_xmin)) + mynodeinfo->idle = true; + else + { + mynodeinfo->idle = false; + mynodeinfo->reported_xmin = *reported_xmin; + } + mynodeinfo->excluded = false; + mynodeinfo->reported_xmin_time = current_time = GTM_TimestampGetCurrent(); + + GTM_RWLockRelease(&mynodeinfo->node_lock); + + /* Compute both, idle as well as non-idle xmin */ + non_idle_global_xmin = InvalidGlobalTransactionId; + idle_global_xmin = InvalidGlobalTransactionId; + + num_nodes = pgxcnode_get_all(all_nodes, MAX_NODES); + + elog(DEBUG1, "num_nodes - %d", num_nodes); + + for (ii = 0; ii < num_nodes; ii++) + { + GlobalTransactionId xid; + GTM_PGXCNodeInfo *nodeinfo = all_nodes[ii]; + + elog(DEBUG1, "nodeinfo %p, type: %d, exclude %c, idle %c, xmin %d, time %lld", + nodeinfo, nodeinfo->type, nodeinfo->excluded ? 'T' : 'F', + nodeinfo->idle ? 'T' : 'F', + nodeinfo->reported_xmin, nodeinfo->reported_xmin_time); + + if (nodeinfo->excluded) + continue; + + if (nodeinfo->type != GTM_NODE_COORDINATOR && nodeinfo->type != + GTM_NODE_DATANODE) + continue; + + if (GTM_TimestampDifferenceExceeds(nodeinfo->reported_xmin_time, + current_time, GTM_REPORT_XMIN_DELAY_THRESHOLD)) + { + elog(LOG, "Timediff exceeds threshold - %ld:%ld - excluding the " + "node from GlobalXmin calculation", + nodeinfo->reported_xmin_time, current_time); + + GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_WRITE); + if (GTM_TimestampDifferenceExceeds(nodeinfo->reported_xmin_time, + current_time, GTM_REPORT_XMIN_DELAY_THRESHOLD)) + { + nodeinfo->excluded = true; + GTM_RWLockRelease(&nodeinfo->node_lock); + continue; + } + GTM_RWLockRelease(&nodeinfo->node_lock); + } + + /* + * If the remote node is idle, don't include its reported xmin in the + * calculation which could be quite stale + */ + if (mynodeinfo->idle && (nodeinfo == mynodeinfo)) + continue; + + /* + * Now grab the lock on the nodeinfo so that no further changes are + * possible to its state. + */ + GTM_RWLockAcquire(&nodeinfo->node_lock, GTM_LOCKMODE_READ); + + /* + * Just check again if the excluded state hasn't changed. Shouldn't + * happen too often anyways + */ + if (nodeinfo->excluded) + { + GTM_RWLockRelease(&nodeinfo->node_lock); + continue; + } + + xid = nodeinfo->reported_xmin; + if (!nodeinfo->idle) + { + if (!GlobalTransactionIdIsValid(non_idle_global_xmin)) + non_idle_global_xmin = xid; + else if (GlobalTransactionIdPrecedes(xid, non_idle_global_xmin)) + non_idle_global_xmin = xid; + } + else + { + if (!GlobalTransactionIdIsValid(idle_global_xmin)) + idle_global_xmin = xid; + else if (GlobalTransactionIdPrecedes(xid, idle_global_xmin)) + idle_global_xmin = xid; + } + GTM_RWLockRelease(&nodeinfo->node_lock); + } + + /* + * If the remote node is idle, a new xmin might have been computed for it + * by us. We first try for non_idle_global_xmin, but if all nodes are idle, + * we use the idle_global_xmin + */ + if (mynodeinfo && mynodeinfo->idle) + { + GTM_RWLockAcquire(&mynodeinfo->node_lock, GTM_LOCKMODE_WRITE); + if (GlobalTransactionIdIsValid(non_idle_global_xmin)) + { + if (GlobalTransactionIdFollows(non_idle_global_xmin, + mynodeinfo->reported_xmin)) + *reported_xmin = mynodeinfo->reported_xmin = non_idle_global_xmin; + } + else if (GlobalTransactionIdIsValid(idle_global_xmin)) + { + if (GlobalTransactionIdFollows(non_idle_global_xmin, + mynodeinfo->reported_xmin)) + *reported_xmin = mynodeinfo->reported_xmin = idle_global_xmin; + } + mynodeinfo->reported_xmin_time = current_time; + GTM_RWLockRelease(&mynodeinfo->node_lock); + } + + /* + * Now all nodes that must be excluded from GlobalXmin computation have + * been marked correctly and xmin computed and set for an idle remote node, + * if so. Lets compute the GlobalXmin + */ + + /* + * GlobalXmin is capped by the latestCompletedXid. Since any future + * additions/changes can't cross this horizon, it seems appropriate to use + * this as upper bound for GlobalXmin computation + */ + global_xmin = GTMTransactions.gt_latestCompletedXid; + if (!GlobalTransactionIdIsValid(global_xmin)) + global_xmin = FirstNormalGlobalTransactionId; + else + GlobalTransactionIdAdvance(global_xmin); + + for (ii = 0; ii < num_nodes; ii++) + { + GlobalTransactionId xid; + GTM_PGXCNodeInfo *nodeinfo = all_nodes[ii]; + + if (nodeinfo->excluded) + continue; + + if (nodeinfo->type != GTM_NODE_COORDINATOR && nodeinfo->type != + GTM_NODE_DATANODE) + continue; + + /* Fetch once */ + xid = nodeinfo->reported_xmin; + if (!GlobalTransactionIdIsValid(global_xmin)) + global_xmin = xid; + else if (GlobalTransactionIdPrecedes(xid, global_xmin)) + global_xmin = xid; + } + GTM_RWLockRelease(&PGXCNodesLock); + + /* + * Now update the GTM_GlobalXmin and also record the time when its updated + * but iff someone else has not beaten us in the calculation already, which + * is possible because we did the calculation holding only a READ lock on + * PGXCNodesLock + */ + if (GlobalTransactionIdIsValid(global_xmin)) + { + GTM_RWLockAcquire(&PGXCNodesLock, GTM_LOCKMODE_WRITE); + if (GlobalTransactionIdPrecedes(GTM_GlobalXmin, global_xmin)) + { + GTM_GlobalXmin = global_xmin; + GTM_GlobalXminComputedTime = current_time; + } + else + global_xmin = GTM_GlobalXmin; + GTM_RWLockRelease(&PGXCNodesLock); + } + + + elog(DEBUG1, "GTM_HandleGlobalXmin - %d", global_xmin); + return global_xmin; +} diff --git a/src/gtm/recovery/register_gtm.c b/src/gtm/recovery/register_gtm.c index 371b4c97f8..780dcd3b47 100644 --- a/src/gtm/recovery/register_gtm.c +++ b/src/gtm/recovery/register_gtm.c @@ -63,6 +63,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) int len; StringInfoData buf; GTM_PGXCNodeStatus status; + GlobalTransactionId xmin; /* Read Node Type */ memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), @@ -107,10 +108,13 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) status = pq_getmsgint(message, sizeof (GTM_PGXCNodeStatus)); + memcpy(&xmin, pq_getmsgbytes(message, sizeof (GlobalTransactionId)), + sizeof (GlobalTransactionId)); + elog(DEBUG1, "ProcessPGXCNodeRegister: ipaddress = \"%s\", node name = \"%s\", proxy name = \"%s\", " - "datafolder \"%s\", status = %d", - ipaddress, node_name, proxyname, datafolder, status); + "datafolder \"%s\", status = %d, xmin = %d", + ipaddress, node_name, proxyname, datafolder, status, xmin); if ((type!=GTM_NODE_GTM_PROXY) && (type!=GTM_NODE_GTM_PROXY_POSTMASTER) && @@ -162,7 +166,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) } if (Recovery_PGXCNodeRegister(type, node_name, port, - proxyname, status, + proxyname, status, &xmin, ipaddress, datafolder, false, myport->sock)) { ereport(ERROR, @@ -203,7 +207,8 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) port, node_name, datafolder, - status); + status, + xmin); elog(DEBUG1, "node_register_internal() returns rc %d.", _rc); @@ -235,6 +240,7 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) pq_sendint(&buf, strlen(node_name), 4); /* Node name (var-len) */ pq_sendbytes(&buf, node_name, strlen(node_name)); + pq_sendint(&buf, xmin, sizeof (GlobalTransactionId)); pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) diff --git a/src/gtm/test/test_serialize.c b/src/gtm/test/test_serialize.c index 8d0bde372e..c1049b8862 100644 --- a/src/gtm/test/test_serialize.c +++ b/src/gtm/test/test_serialize.c @@ -42,12 +42,11 @@ test_snapshotdata_1(void) data->sn_xmin = 128; data->sn_xmax = 256; - data->sn_recent_global_xmin = 512; data->sn_xcnt = 1024; data->sn_xip = 2048; - printf("sn_xmin=%d, sn_xmax=%d, sn_recent_global_xmin=%d, sn_xcnt=%d, sn_xip=%d\n", - data->sn_xmin, data->sn_xmax, data->sn_recent_global_xmin, data->sn_xcnt, data->sn_xip); + printf("sn_xmin=%d, sn_xmax=%d, sn_xcnt=%d, sn_xip=%d\n", + data->sn_xmin, data->sn_xmax, data->sn_xcnt, data->sn_xip); /* serialize */ buflen = sizeof(GTM_SnapshotData); @@ -61,8 +60,8 @@ test_snapshotdata_1(void) /* deserialize */ data2 = (GTM_SnapshotData *)malloc(sizeof(GTM_SnapshotData)); gtm_deserialize_snapshotdata(data2, buf, buflen); - printf("sn_xmin=%d, sn_xmax=%d, sn_recent_global_xmin=%d, sn_xcnt=%d, sn_xip=%d\n", - data2->sn_xmin, data2->sn_xmax, data2->sn_recent_global_xmin, data2->sn_xcnt, data2->sn_xip); + printf("sn_xmin=%d, sn_xmax=%d, sn_xcnt=%d, sn_xip=%d\n", + data2->sn_xmin, data2->sn_xmax, data2->sn_xcnt, data2->sn_xip); TEARDOWN(); @@ -88,7 +87,6 @@ build_dummy_gtm_transactioninfo() data->gti_current_snapshot.sn_xmin = 128; data->gti_current_snapshot.sn_xmax = 256; - data->gti_current_snapshot.sn_recent_global_xmin = 512; data->gti_current_snapshot.sn_xcnt = 1024; data->gti_current_snapshot.sn_xip = 2048; @@ -149,7 +147,6 @@ test_transactioninfo_1(void) _ASSERT(data2->gti_coordcount==5); _ASSERT(data2->gti_current_snapshot.sn_xmin==128); _ASSERT(data2->gti_current_snapshot.sn_xmax==256); - _ASSERT(data2->gti_current_snapshot.sn_recent_global_xmin==512); _ASSERT(data2->gti_current_snapshot.sn_xcnt==1024); _ASSERT(data2->gti_current_snapshot.sn_xip==2048); diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h index 548441ce65..27383d8cac 100644 --- a/src/include/access/gtm.h +++ b/src/include/access/gtm.h @@ -44,7 +44,7 @@ extern int CommitPreparedTranGTM(GlobalTransactionId gxid, extern GTM_Snapshot GetSnapshotGTM(GlobalTransactionId gxid, bool canbe_grouped); /* Node registration APIs with GTM */ -extern int RegisterGTM(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *datafolder); +extern int RegisterGTM(GTM_PGXCNodeType type, GlobalTransactionId *xmin); extern int UnregisterGTM(GTM_PGXCNodeType type); /* Sequence interface APIs with GTM */ @@ -62,4 +62,6 @@ extern int DropSequenceGTM(char *name, GTM_SequenceKeyType type); extern int RenameSequenceGTM(char *seqname, const char *newseqname); /* Barrier */ extern int ReportBarrierGTM(char *barrier_id); +extern int ReportGlobalXmin(GlobalTransactionId *gxid, + GlobalTransactionId *global_xmin, bool isIdle); #endif /* ACCESS_GTM_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index caa9d37137..f64c2110bc 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -405,6 +405,7 @@ extern bool IsSendCommandId(void); extern void SetSendCommandId(bool status); extern bool IsPGXCNodeXactReadOnly(void); extern bool IsPGXCNodeXactDatanodeDirect(void); +extern void TransactionRecordXidWait(TransactionId xid); #endif extern int xactGetCommittedChildren(TransactionId **ptr); diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h index ee19587ac4..2efe7e20c9 100644 --- a/src/include/gtm/gtm_c.h +++ b/src/include/gtm/gtm_c.h @@ -110,7 +110,6 @@ typedef struct GTM_SnapshotData { GlobalTransactionId sn_xmin; GlobalTransactionId sn_xmax; - GlobalTransactionId sn_recent_global_xmin; uint32 sn_xcnt; GlobalTransactionId *sn_xip; } GTM_SnapshotData; @@ -151,4 +150,9 @@ typedef enum GTM_PortLastCall #define _(x) gettext(x) +#define GTM_ERRCODE_TOO_OLD_XMIN 1 +#define GTM_ERRCODE_NODE_NOT_REGISTERED 2 +#define GTM_ERRCODE_NODE_EXCLUDED 3 +#define GTM_ERRCODE_UNKNOWN 4 + #endif /* GTM_C_H */ diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h index b21f253143..d21ba44297 100644 --- a/src/include/gtm/gtm_client.h +++ b/src/include/gtm/gtm_client.h @@ -112,6 +112,7 @@ typedef union GTM_ResultData GTM_PGXCNodeType type; /* NODE_REGISTER */ size_t len; char *node_name; /* NODE_UNREGISTER */ + GlobalTransactionId xmin; } grd_node; struct @@ -120,6 +121,13 @@ typedef union GTM_ResultData GTM_PGXCNodeInfo *nodeinfo[MAX_NODES]; } grd_node_list; + struct + { + GlobalTransactionId reported_xmin; + GlobalTransactionId global_xmin; + int errcode; + } grd_report_xmin; /* REPORT_XMIN */ + /* * TODO * TXN_GET_STATUS @@ -252,18 +260,17 @@ int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, - char *datafolder); -int bkup_node_register(GTM_Conn *conn, - GTM_PGXCNodeType type, - GTM_PGXCNodePort port, - char *node_name, - char *datafolder); -int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder); + char *datafolder, + GlobalTransactionId *xmin); +int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, + char *node_name, char *datafolder, GlobalTransactionId *xmin); int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, char *node_name, - char *datafolder, GTM_PGXCNodeStatus status); -int bkup_node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder); + char *datafolder, GTM_PGXCNodeStatus status, + GlobalTransactionId *xmin); int bkup_node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, - char *node_name, char *datafolder, GTM_PGXCNodeStatus status); + char *node_name, char *datafolder, + GTM_PGXCNodeStatus status, + GlobalTransactionId xmin); int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char *node_name); int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name); @@ -271,6 +278,10 @@ int backend_disconnect(GTM_Conn *conn, bool is_postmaster, GTM_PGXCNodeType type char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc); int register_session(GTM_Conn *conn, const char *coord_name, int coord_procid, int coord_backendid); +int report_global_xmin(GTM_Conn *conn, const char *node_name, + GTM_PGXCNodeType type, GlobalTransactionId *gxid, + GlobalTransactionId *global_xmin, + bool isIdle, int *errcode); /* * Sequence Management API diff --git a/src/include/gtm/gtm_gxid.h b/src/include/gtm/gtm_gxid.h new file mode 100644 index 0000000000..7fe3cf732c --- /dev/null +++ b/src/include/gtm/gtm_gxid.h @@ -0,0 +1,47 @@ +#ifndef _GTM_GXID_H +#define _GTM_GXID_H + +/* ---------------- + * Special transaction ID values + * + * BootstrapGlobalTransactionId is the XID for "bootstrap" operations, and + * FrozenGlobalTransactionId is used for very old tuples. Both should + * always be considered valid. + * + * FirstNormalGlobalTransactionId is the first "normal" transaction id. + * Note: if you need to change it, you must change pg_class.h as well. + * ---------------- + */ +#define BootstrapGlobalTransactionId ((GlobalTransactionId) 1) +#define FrozenGlobalTransactionId ((GlobalTransactionId) 2) +#define FirstNormalGlobalTransactionId ((GlobalTransactionId) 3) +#define MaxGlobalTransactionId ((GlobalTransactionId) 0xFFFFFFFF) + +/* ---------------- + * transaction ID manipulation macros + * ---------------- + */ +#define GlobalTransactionIdIsNormal(xid) ((xid) >= FirstNormalGlobalTransactionId) +#define GlobalTransactionIdEquals(id1, id2) ((id1) == (id2)) +#define GlobalTransactionIdStore(xid, dest) (*(dest) = (xid)) +#define StoreInvalidGlobalTransactionId(dest) (*(dest) = InvalidGlobalTransactionId) + +/* advance a transaction ID variable, handling wraparound correctly */ +#define GlobalTransactionIdAdvance(dest) \ + do { \ + (dest)++; \ + if ((dest) < FirstNormalGlobalTransactionId) \ + (dest) = FirstNormalGlobalTransactionId; \ + } while(0) + +/* back up a transaction ID variable, handling wraparound correctly */ +#define GlobalTransactionIdRetreat(dest) \ + do { \ + (dest)--; \ + } while ((dest) < FirstNormalGlobalTransactionId) + +extern bool GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2); +extern bool GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2); +extern bool GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2); +extern bool GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2); +#endif diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h index f4854cbf5d..51677b6518 100644 --- a/src/include/gtm/gtm_msg.h +++ b/src/include/gtm/gtm_msg.h @@ -33,6 +33,8 @@ typedef enum GTM_MessageType MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */ MSG_BKUP_NODE_UNREGISTER, /* Backup of MSG_NODE_UNREGISTER */ MSG_REGISTER_SESSION, /* Register distributed session with GTM */ + MSG_REPORT_XMIN, /* Report RecentGlobalXmin to GTM */ + MSG_BKUP_REPORT_XMIN, MSG_NODE_LIST, /* Get node list */ MSG_NODE_BEGIN_REPLICATION_INIT, MSG_NODE_END_REPLICATION_INIT, @@ -109,6 +111,7 @@ typedef enum GTM_ResultType NODE_REGISTER_RESULT, NODE_UNREGISTER_RESULT, REGISTER_SESSION_RESULT, + REPORT_XMIN_RESULT, NODE_LIST_RESULT, NODE_BEGIN_REPLICATION_INIT_RESULT, NODE_END_REPLICATION_INIT_RESULT, diff --git a/src/include/gtm/gtm_time.h b/src/include/gtm/gtm_time.h index 9d5bac1f9c..9bcb8e5f79 100644 --- a/src/include/gtm/gtm_time.h +++ b/src/include/gtm/gtm_time.h @@ -33,5 +33,10 @@ #endif GTM_Timestamp GTM_TimestampGetCurrent(void); +void GTM_TimestampDifference(GTM_Timestamp start_time, GTM_Timestamp stop_time, + long *secs, int *microsecs); +bool GTM_TimestampDifferenceExceeds(GTM_Timestamp start_time, + GTM_Timestamp stop_time, + int msec); #endif diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h index d220e81df1..b48e26f7a8 100644 --- a/src/include/gtm/gtm_txn.h +++ b/src/include/gtm/gtm_txn.h @@ -16,48 +16,11 @@ #include "gtm/libpq-be.h" #include "gtm/gtm_c.h" +#include "gtm/gtm_gxid.h" #include "gtm/gtm_lock.h" #include "gtm/gtm_list.h" #include "gtm/stringinfo.h" -/* ---------------- - * Special transaction ID values - * - * BootstrapGlobalTransactionId is the XID for "bootstrap" operations, and - * FrozenGlobalTransactionId is used for very old tuples. Both should - * always be considered valid. - * - * FirstNormalGlobalTransactionId is the first "normal" transaction id. - * Note: if you need to change it, you must change pg_class.h as well. - * ---------------- - */ -#define BootstrapGlobalTransactionId ((GlobalTransactionId) 1) -#define FrozenGlobalTransactionId ((GlobalTransactionId) 2) -#define FirstNormalGlobalTransactionId ((GlobalTransactionId) 3) -#define MaxGlobalTransactionId ((GlobalTransactionId) 0xFFFFFFFF) - -/* ---------------- - * transaction ID manipulation macros - * ---------------- - */ -#define GlobalTransactionIdIsNormal(xid) ((xid) >= FirstNormalGlobalTransactionId) -#define GlobalTransactionIdEquals(id1, id2) ((id1) == (id2)) -#define GlobalTransactionIdStore(xid, dest) (*(dest) = (xid)) -#define StoreInvalidGlobalTransactionId(dest) (*(dest) = InvalidGlobalTransactionId) - -/* advance a transaction ID variable, handling wraparound correctly */ -#define GlobalTransactionIdAdvance(dest) \ - do { \ - (dest)++; \ - if ((dest) < FirstNormalGlobalTransactionId) \ - (dest) = FirstNormalGlobalTransactionId; \ - } while(0) - -/* back up a transaction ID variable, handling wraparound correctly */ -#define GlobalTransactionIdRetreat(dest) \ - do { \ - (dest)--; \ - } while ((dest) < FirstNormalGlobalTransactionId) typedef int XidStatus; @@ -71,15 +34,12 @@ typedef int XidStatus; extern bool GlobalTransactionIdDidCommit(GlobalTransactionId transactionId); extern bool GlobalTransactionIdDidAbort(GlobalTransactionId transactionId); extern void GlobalTransactionIdAbort(GlobalTransactionId transactionId); -extern bool GlobalTransactionIdPrecedes(GlobalTransactionId id1, GlobalTransactionId id2); -extern bool GlobalTransactionIdPrecedesOrEquals(GlobalTransactionId id1, GlobalTransactionId id2); -extern bool GlobalTransactionIdFollows(GlobalTransactionId id1, GlobalTransactionId id2); -extern bool GlobalTransactionIdFollowsOrEquals(GlobalTransactionId id1, GlobalTransactionId id2); /* in transam/varsup.c */ extern GlobalTransactionId GTM_GetGlobalTransactionId(GTM_TransactionHandle handle); extern GlobalTransactionId GTM_GetGlobalTransactionIdMulti(GTM_TransactionHandle handle[], int txn_count); extern GlobalTransactionId ReadNewGlobalTransactionId(void); +extern GlobalTransactionId GTM_GetLatestCompletedXID(void); extern void SetGlobalTransactionIdLimit(GlobalTransactionId oldest_datfrozenxid); extern void SetNextGlobalTransactionId(GlobalTransactionId gxid); extern void GTM_SetShuttingDown(void); @@ -246,6 +206,7 @@ void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message); void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message); void ProcessGXIDListCommand(Port *myport, StringInfo message); void ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message); +void ProcessReportXminCommand(Port *myport, StringInfo message, bool is_backup); void ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message); void ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message); diff --git a/src/include/gtm/register.h b/src/include/gtm/register.h index 8fbaac19c4..734c12cabf 100644 --- a/src/include/gtm/register.h +++ b/src/include/gtm/register.h @@ -58,6 +58,16 @@ typedef struct GTM_PGXCNodeInfo char *ipaddress; /* IP address of the nodes */ char *datafolder; /* Data folder of the node */ GTM_PGXCNodeStatus status; /* Node status */ + bool excluded; /* + * Has the node timed out and be + * excluded from xmin computation? + */ + bool idle; /* Has the node been idle since + * last report + */ + GlobalTransactionId reported_xmin; /* Last reported xmin */ + GTM_Timestamp reported_xmin_time; /* Time when last report was + received */ int max_sessions; int num_sessions; GTM_PGXCSession *sessions; @@ -77,6 +87,7 @@ int Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *proxyname, GTM_PGXCNodeStatus status, + GlobalTransactionId *xmin, char *ipaddress, char *datafolder, bool in_recovery, @@ -103,4 +114,7 @@ void ProcessPGXCNodeList(Port *myport, StringInfo message); void ProcessGTMBeginBackup(Port *myport, StringInfo message); void ProcessGTMEndBackup(Port *myport, StringInfo message); +void GTM_InitNodeManager(void); +GlobalTransactionId GTM_HandleGlobalXmin(GTM_PGXCNodeType type, char *node_name, + GlobalTransactionId *reported_xmin, bool remoteIdle, int *errcode); #endif /* GTM_NODE_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 6c47f8611d..b9e5919580 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -403,6 +403,7 @@ typedef enum WalReceiverProcess, #ifdef PGXC PoolerProcess, + ClusterMonitorProcess, #endif NUM_AUXPROCTYPES /* Must be last! */ @@ -416,6 +417,7 @@ extern AuxProcType MyAuxProcType; #define AmCheckpointerProcess() (MyAuxProcType == CheckpointerProcess) #define AmWalWriterProcess() (MyAuxProcType == WalWriterProcess) #define AmWalReceiverProcess() (MyAuxProcType == WalReceiverProcess) +#define AmClusterMonitorProcess() (MyAuxProcType == ClusterMonitorProcess) /***************************************************************************** diff --git a/src/include/pgxc/pgxc.h b/src/include/pgxc/pgxc.h index 9cd3995b2f..af68918b5d 100644 --- a/src/include/pgxc/pgxc.h +++ b/src/include/pgxc/pgxc.h @@ -19,8 +19,8 @@ #ifndef PGXC_H #define PGXC_H -#include "storage/lwlock.h" #include "postgres.h" +#include "storage/lwlock.h" extern bool isPGXCCoordinator; extern bool isPGXCDataNode; diff --git a/src/include/postmaster/clustermon.h b/src/include/postmaster/clustermon.h new file mode 100644 index 0000000000..0c97fa9fdd --- /dev/null +++ b/src/include/postmaster/clustermon.h @@ -0,0 +1,42 @@ +/*------------------------------------------------------------------------- + * + * clustermon.h + * header file for cluster monitor process + * + * + * Portions Copyright (c) 2015, 2ndQuadrant Ltd + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010-2012 Postgres-XC Development Group + * + * src/include/postmaster/autovacuum.h + * + *------------------------------------------------------------------------- + */ +#ifndef CLUSTERMON_H +#define CLUSTERMON_H + +typedef struct +{ + slock_t mutex; + GlobalTransactionId reported_recent_global_xmin; + GlobalTransactionId gtm_recent_global_xmin; +} ClusterMonitorCtlData; + +extern Size ClusterMonitorShmemSize(void); + +/* Status inquiry functions */ +extern bool IsClusterMonitorProcess(void); + +/* Functions to start cluster monitor process, called from postmaster */ +extern int StartClusterMonitor(void); +GlobalTransactionId ClusterMonitorGetGlobalXmin(void); +void ClusterMonitorSetGlobalXmin(GlobalTransactionId xmin); + +#ifdef EXEC_BACKEND +extern void ClusterMonitorIAm(void); +#endif + +extern int ClusterMonitorInit(void); + +#endif /* CLUSTERMON_H */ diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index d3053baf62..a63cb067aa 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -68,12 +68,16 @@ extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid); extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); +void ProcArrayCheckXminConsistency(TransactionId global_xmin); extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum); +extern TransactionId GetOldestXminInternal(Relation rel, bool ignoreVacuum, + bool computeLocal, bool *isIdle, TransactionId lastGlobalXmin, + TransactionId lastReportedXmin); extern TransactionId GetOldestActiveTransactionId(void); extern TransactionId GetOldestSafeDecodingTransactionId(void); |