diff options
| -rw-r--r-- | src/backend/utils/mmgr/mcxt.c | 8 | ||||
| -rw-r--r-- | src/gtm/client/fe-protocol.c | 9 | ||||
| -rw-r--r-- | src/gtm/client/gtm_client.c | 38 | ||||
| -rw-r--r-- | src/gtm/common/gtm_serialize.c | 23 | ||||
| -rw-r--r-- | src/gtm/common/mcxt.c | 8 | ||||
| -rw-r--r-- | src/gtm/gtm_ctl/Makefile | 8 | ||||
| -rw-r--r-- | src/gtm/main/Makefile | 7 | ||||
| -rw-r--r-- | src/gtm/main/gtm_opt.c | 35 | ||||
| -rw-r--r-- | src/gtm/main/gtm_standby.c | 69 | ||||
| -rw-r--r-- | src/gtm/main/gtm_thread.c | 40 | ||||
| -rw-r--r-- | src/gtm/main/main.c | 121 | ||||
| -rw-r--r-- | src/gtm/proxy/Makefile | 6 | ||||
| -rw-r--r-- | src/gtm/proxy/gtm_proxy_opt.c | 41 | ||||
| -rw-r--r-- | src/gtm/recovery/Makefile | 4 | ||||
| -rw-r--r-- | src/gtm/recovery/register_common.c (renamed from src/gtm/recovery/register.c) | 315 | ||||
| -rw-r--r-- | src/gtm/recovery/register_gtm.c | 453 | ||||
| -rw-r--r-- | src/include/gen_alloc.h | 6 | ||||
| -rw-r--r-- | src/include/gtm/gtm.h | 5 | ||||
| -rw-r--r-- | src/include/gtm/gtm_c.h | 13 | ||||
| -rw-r--r-- | src/include/gtm/gtm_client.h | 6 | ||||
| -rw-r--r-- | src/include/gtm/gtm_msg.h | 4 | ||||
| -rw-r--r-- | src/include/gtm/gtm_opt.h | 26 | ||||
| -rw-r--r-- | src/include/gtm/gtm_standby.h | 8 | ||||
| -rw-r--r-- | src/include/gtm/gtm_txn.h | 22 | ||||
| -rw-r--r-- | src/include/gtm/register.h | 3 |
25 files changed, 877 insertions, 401 deletions
diff --git a/src/backend/utils/mmgr/mcxt.c b/src/backend/utils/mmgr/mcxt.c index 887241be0a..92473a3b43 100644 --- a/src/backend/utils/mmgr/mcxt.c +++ b/src/backend/utils/mmgr/mcxt.c @@ -739,10 +739,16 @@ void *current_memcontext() return((void *)CurrentMemoryContext); } +void *allocTopCxt(size_t s) +{ + return MemoryContextAlloc(TopMemoryContext, (Size)s); +} + Gen_Alloc genAlloc_class = {(void *)MemoryContextAlloc, (void *)MemoryContextAllocZero, (void *)repalloc, (void *)pfree, - (void *)current_memcontext}; + (void *)current_memcontext, + (void *)allocTopCxt}; #endif diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c index 5f04373cfa..1fff790c53 100644 --- a/src/gtm/client/fe-protocol.c +++ b/src/gtm/client/fe-protocol.c @@ -357,6 +357,12 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) case NODE_END_REPLICATION_INIT_RESULT: break; + case BEGIN_BACKUP_RESULT: + break; + + case END_BACKUP_RESULT: + break; + case TXN_BEGIN_RESULT: if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txnhandle, sizeof (GTM_TransactionHandle), conn)) @@ -648,6 +654,9 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) break; } + /* + * I don't understand why malloc() here? Should be palloc()? + */ result->gr_resdata.grd_txn_gid_list.ptr = (char *)malloc(result->gr_resdata.grd_txn_gid_list.len); diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index d830971363..67af8d7eef 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -1602,3 +1602,41 @@ send_failed: conn->result->gr_status = GTM_RESULT_COMM_ERROR; return -1; } + +int +set_begin_end_backup(GTM_Conn *conn, bool begin) +{ + GTM_Result *res = NULL; + time_t finish_time; + int ii; + + if (gtmpqPutMsgStart('C', true, conn)) + goto send_failed; + + if(gtmpqPutInt(begin ? MSG_BEGIN_BACKUP : MSG_END_BACKUP, + sizeof(GTM_MessageType), conn)) + goto send_failed; + + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + if (gtmpqFlush(conn)) + goto send_failed; + + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; + + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; + + return res->gr_status; + +receive_failed: +send_failed: + conn->result = makeEmptyResultIfIsNull(conn->result); + conn->result->gr_status = GTM_RESULT_COMM_ERROR; + return -1; +} + diff --git a/src/gtm/common/gtm_serialize.c b/src/gtm/common/gtm_serialize.c index 8ec5215d9f..adef9e4463 100644 --- a/src/gtm/common/gtm_serialize.c +++ b/src/gtm/common/gtm_serialize.c @@ -378,6 +378,22 @@ gtm_deserialize_transactioninfo(GTM_TransactionInfo *data, const char *buf, size len += sizeof(GTM_TransactionStates); /* GTM_TransactionInfo.gti_coordname */ +#if 1 + { + uint32 ll; + + memcpy(&ll, buf+len, sizeof(uint32)); + len += sizeof(uint32); + if (ll > 0) + { + data->gti_coordname = genAllocTop(sizeof(ll+1)); /* Should be allocated at TopMostContext */ + memcpy(data->gti_coordname, buf+len, ll); + data->gti_coordname[ll] = 0; + } + else + data->gti_coordname = NULL; + } +#else if (data->gti_coordname != NULL) { namelen = (uint32)strlen(data->gti_coordname); @@ -392,6 +408,7 @@ gtm_deserialize_transactioninfo(GTM_TransactionInfo *data, const char *buf, size memcpy((char *)buf + len, &namelen, sizeof(uint32)); len += sizeof(uint32); } +#endif /* GTM_TransactionInfo.gti_xmin */ memcpy(&(data->gti_xmin), buf + len, sizeof(GlobalTransactionId)); @@ -414,7 +431,7 @@ gtm_deserialize_transactioninfo(GTM_TransactionInfo *data, const char *buf, size len += sizeof(uint32); if (string_len > 0) { - data->nodestring = (char *)genAlloc(string_len + 1); + data->nodestring = (char *)genAllocTop(string_len + 1); /* Should allocate at TopMostMemoryContext */ memcpy(data->nodestring, buf + len, string_len); data->gti_gid[string_len] = 0; /* null-terminated */ len += string_len; @@ -427,7 +444,7 @@ gtm_deserialize_transactioninfo(GTM_TransactionInfo *data, const char *buf, size len += sizeof(uint32); if (string_len > 0) { - data->gti_gid = (char *)genAlloc(string_len+1); + data->gti_gid = (char *)genAllocTop(string_len+1); /* Should allocate at TopMostMemoryContext */ memcpy(data->gti_gid, buf + len, string_len); data->gti_gid[string_len] = 0; /* null-terminated */ len += string_len; @@ -578,7 +595,7 @@ gtm_serialize_transactions(GTM_Transactions *data, char *buf, size_t buflen) /* * Not to include invalid global transactions. */ - if (data->gt_transactions_array[i].gti_gxid == InvalidGlobalTransactionId) + if (data->gt_transactions_array[i].gti_in_use != TRUE) continue; buflen2 = gtm_get_transactioninfo_size(&data->gt_transactions_array[i]); diff --git a/src/gtm/common/mcxt.c b/src/gtm/common/mcxt.c index e214cfe1aa..db7b3b1429 100644 --- a/src/gtm/common/mcxt.c +++ b/src/gtm/common/mcxt.c @@ -770,9 +770,15 @@ void *current_memcontext(void) return((void *)CurrentMemoryContext); } +void *allocTopMemCxt(size_t s) +{ + return (void *)MemoryContextAlloc(TopMostMemoryContext, (Size)s); +} + Gen_Alloc genAlloc_class = {(void *)MemoryContextAlloc, (void *)MemoryContextAllocZero, (void *)repalloc, (void *)pfree, - (void *)current_memcontext}; + (void *)current_memcontext, + (void *)allocTopMemCxt}; diff --git a/src/gtm/gtm_ctl/Makefile b/src/gtm/gtm_ctl/Makefile index b549d00f11..40dbc4fd51 100644 --- a/src/gtm/gtm_ctl/Makefile +++ b/src/gtm/gtm_ctl/Makefile @@ -3,13 +3,17 @@ top_builddir=../../.. include $(top_builddir)/src/Makefile.global -OBJS=gtm_ctl.o ../common/libgtm.a ../libpq/libpqcomm.a ../client/libgtmclient.a ../path/libgtmpath.a +OBJS=gtm_ctl.o + +OTHERS=../common/libgtm.a ../libpq/libpqcomm.a ../client/libgtmclient.a ../path/libgtmpath.a + LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq + LIBS=-lpthread gtm_ctl:$(OBJS) - $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ -o gtm_ctl + $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ $(OTHERS) -o gtm_ctl all:gtm_ctl diff --git a/src/gtm/main/Makefile b/src/gtm/main/Makefile index f5d1684d37..dec85372ee 100644 --- a/src/gtm/main/Makefile +++ b/src/gtm/main/Makefile @@ -3,13 +3,16 @@ top_builddir=../../.. include $(top_builddir)/src/Makefile.global -OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o gtm_standby.o gtm_opt.o ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a +OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o gtm_standby.o gtm_opt.o + +OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a ../../port/libpgport.a + LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq LIBS=-lpthread gtm:$(OBJS) - $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ ../../port/libpgport.a -o gtm + $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ $(OTHERS) -o gtm all:gtm diff --git a/src/gtm/main/gtm_opt.c b/src/gtm/main/gtm_opt.c index 580a4646cf..c84dbe661f 100644 --- a/src/gtm/main/gtm_opt.c +++ b/src/gtm/main/gtm_opt.c @@ -112,6 +112,11 @@ Config_Type_Names(); * it is not single quoted at dump time. */ +/* + * Definition of option name strings are given in gtm_opt.h, which are shared + * with command line option handling. Naming is GTM_OPTNAME_*. + */ + /******** option records follow ********/ @@ -127,7 +132,7 @@ struct config_bool ConfigureNamesBool[] = struct config_int ConfigureNamesInt[] = { { - {"port", GTMC_STARTUP, + {GTM_OPTNAME_PORT, GTMC_STARTUP, gettext_noop("Listen Port of GTM or GTM standby server."), NULL, 0 @@ -137,7 +142,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"active_port", GTMC_SIGHUP, + {GTM_OPTNAME_ACTIVE_PORT, GTMC_SIGHUP, gettext_noop("GTM server port number when it works as GTM-Standby."), NULL, 0 @@ -147,7 +152,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_idle", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_IDLE, GTMC_STARTUP, gettext_noop("Sets \"keepalives_idle\" option for the connection to GTM."), gettext_noop("This option is effective only when it runs as GTM-Standby."), GTMOPT_UNIT_TIME @@ -157,7 +162,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_interval", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_INTERVAL, GTMC_STARTUP, gettext_noop("Sets \"keepalives_interval\" option fo the connetion to GTM."), gettext_noop("This option is effective only when it runs as GTM-Standby."), GTMOPT_UNIT_TIME @@ -167,7 +172,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_count", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_COUNT, GTMC_STARTUP, gettext_noop("Sets \"keepalives_count\" option to the connection to GTM."), gettext_noop("This option is effective only when it runs as GTM-Standby."), 0 @@ -194,7 +199,7 @@ struct config_real ConfigureNamesReal[] = struct config_string ConfigureNamesString[] = { { - {"data_dir", GTMC_STARTUP, + {GTM_OPTNAME_DATA_DIR, GTMC_STARTUP, gettext_noop("Work directory."), NULL, 0 @@ -206,7 +211,7 @@ struct config_string ConfigureNamesString[] = }, { - {"config_file", GTMC_SIGHUP, + {GTM_OPTNAME_CONFIG_FILE, GTMC_SIGHUP, gettext_noop("Configuration file name."), NULL, 0 @@ -218,7 +223,7 @@ struct config_string ConfigureNamesString[] = }, { - {"nodename", GTMC_STARTUP, + {GTM_OPTNAME_NODENAME, GTMC_STARTUP, gettext_noop("Name of this GTM/GTM-Standby."), NULL, 0 @@ -230,7 +235,7 @@ struct config_string ConfigureNamesString[] = }, { - {"listen_addresses", GTMC_STARTUP, + {GTM_OPTNAME_LISTEN_ADDRESSES, GTMC_STARTUP, gettext_noop("Listen address."), NULL, 0 @@ -241,7 +246,7 @@ struct config_string ConfigureNamesString[] = }, { - {"active_host", GTMC_SIGHUP, + {GTM_OPTNAME_ACTIVE_HOST, GTMC_SIGHUP, gettext_noop("Address of target GTM ACT."), gettext_noop("This parameter is effective only when it runs as GTM-Standby"), 0 @@ -252,7 +257,7 @@ struct config_string ConfigureNamesString[] = }, { - {"log_file", GTMC_SIGHUP, + {GTM_OPTNAME_LOG_FILE, GTMC_SIGHUP, gettext_noop("Log file name."), NULL, 0 @@ -263,7 +268,7 @@ struct config_string ConfigureNamesString[] = }, { - {"error_reporter", GTMC_SIGHUP, + {GTM_OPTNAME_ERROR_REPORTER, GTMC_SIGHUP, gettext_noop("Command to report various errors."), NULL, 0 @@ -274,7 +279,7 @@ struct config_string ConfigureNamesString[] = }, { - {"status_reader", GTMC_SIGHUP, + {GTM_OPTNAME_STATUS_READER, GTMC_SIGHUP, gettext_noop("Command to get status of global XC node status."), gettext_noop("Runs when configuration file is read by SIGHUP"), 0 @@ -294,7 +299,7 @@ struct config_string ConfigureNamesString[] = struct config_enum ConfigureNamesEnum[] = { { - {"log_min_messages", GTMC_SIGHUP, + {GTM_OPTNAME_LOG_MIN_MESSAGES, GTMC_SIGHUP, gettext_noop("Minimum message level to write to the log file."), NULL, 0 @@ -306,7 +311,7 @@ struct config_enum ConfigureNamesEnum[] = }, { - {"startup", GTMC_SIGHUP, + {GTM_OPTNAME_STARTUP, GTMC_SIGHUP, gettext_noop("Specifies startup mode, act or standby."), NULL, 0 diff --git a/src/gtm/main/gtm_standby.c b/src/gtm/main/gtm_standby.c index d18118dc5d..87fad79854 100644 --- a/src/gtm/main/gtm_standby.c +++ b/src/gtm/main/gtm_standby.c @@ -25,29 +25,21 @@ #include "gtm/gtm_utils.h" #include "gtm/register.h" -static GTM_Conn *GTM_ActiveConn = NULL; +GTM_Conn *GTM_ActiveConn = NULL; static char standbyHostName[NI_MAXHOST]; static char standbyNodeName[NI_MAXHOST]; static int standbyPortNumber; static char *standbyDataDir; -static GTM_Conn *gtm_standby_connect_to_standby_int(int *); +static GTM_Conn * gtm_standby_connect_to_standby_int(int *report_needed); +static GTM_Conn *gtm_standby_connectToActiveGTM(void); extern char *NodeName; /* Defined in main.c */ int gtm_standby_start_startup(void) { - char connect_string[1024]; - int active_port = Recovery_StandbyGetActivePort(); - char *active_address = Recovery_StandbyGetActiveAddress(); - - elog(LOG, "Connecting the GTM active on %s:%d...", active_address, active_port); - - sprintf(connect_string, "host=%s port=%d node_name=%s remote_type=%d", - active_address, active_port, NodeName, GTM_NODE_GTM); - - GTM_ActiveConn = PQconnectGTM(connect_string); + GTM_ActiveConn = gtm_standby_connectToActiveGTM(); if (GTM_ActiveConn == NULL) { elog(DEBUG3, "Error in connection"); @@ -67,6 +59,7 @@ gtm_standby_finish_startup(void) elog(LOG, "Closing a startup connection..."); GTMPQfinish(GTM_ActiveConn); + GTM_ActiveConn = NULL; elog(LOG, "A startup connection closed."); return 1; @@ -320,7 +313,7 @@ gtm_standby_activate_self(void) * Returns a pointer to GTM_PGXCNodeInfo on success, * or returns NULL on failure. */ -static GTM_PGXCNodeInfo * +GTM_PGXCNodeInfo * find_standby_node_info(void) { GTM_PGXCNodeInfo *node[1024]; @@ -493,3 +486,53 @@ gtm_standby_check_communication_error(int *retry_count, GTM_Conn *oldconn) } return false; } + +int +gtm_standby_begin_backup(void) +{ + int rc = set_begin_end_backup(GTM_ActiveConn, true); + return (rc ? 0 : 1); +} + +int +gtm_standby_end_backup(void) +{ + int rc = set_begin_end_backup(GTM_ActiveConn, false); + return (rc ? 0 : 1); +} + +extern char *NodeName; /* Defined in main.c */ + +/* ---WIP, Dec.1st, 2011, K.Suzuki --- */ +void +gtm_standby_finishActiveConn(void) +{ + GTM_ActiveConn = gtm_standby_connectToActiveGTM(); + if (GTM_ActiveConn == NULL) + { + elog(DEBUG3, "Error in connection"); + return 0; + } + elog(LOG, "Connection established to the GTM active."); + + /* Unregister self from Active-GTM */ + node_unregister(GTM_ActiveConn, GTM_NODE_GTM, NodeName); + /* Disconnect form Active */ + GTMPQfinish(GTM_ActiveConn); +} + +static GTM_Conn * +gtm_standby_connectToActiveGTM(void) +{ + char connect_string[1024]; + int active_port = Recovery_StandbyGetActivePort(); + char *active_address = Recovery_StandbyGetActiveAddress(); + + /* Need to connect to Active-GTM again here */ + elog(LOG, "Connecting the GTM active on %s:%d...", active_address, active_port); + + sprintf(connect_string, "host=%s port=%d node_name=%s remote_type=%d", + active_address, active_port, NodeName, GTM_NODE_GTM); + + return PQconnectGTM(connect_string); +} diff --git a/src/gtm/main/gtm_thread.c b/src/gtm/main/gtm_thread.c index c041645521..0470bf3b7d 100644 --- a/src/gtm/main/gtm_thread.c +++ b/src/gtm/main/gtm_thread.c @@ -345,3 +345,43 @@ GTM_ThreadMainWrapper(void *argp) return thrinfo; } + +void +GTM_LockAllOtherThreads(void) +{ + GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo; + int ii; + + for (ii = 0; ii < GTMThreads->gt_array_size; ii++) + { + if (GTMThreads->gt_threads[ii] && GTMThreads->gt_threads[ii] != my_threadinfo) + GTM_RWLockAcquire(>MThreads->gt_threads[ii]->thr_lock, GTM_LOCKMODE_WRITE); + } +} + +void +GTM_UnlockAllOtherThreads(void) +{ + GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo; + int ii; + + for (ii = 0; ii < GTMThreads->gt_array_size; ii++) + { + if (GTMThreads->gt_threads[ii] && GTMThreads->gt_threads[ii] != my_threadinfo) + GTM_RWLockRelease(>MThreads->gt_threads[ii]->thr_lock); + } +} + +void +GTM_DoForAllOtherThreads(void (* process_routine)(GTM_ThreadInfo *)) +{ + GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo; + int ii; + + for (ii = 0; ii < GTMThreads->gt_array_size; ii++) + { + if (GTMThreads->gt_threads[ii] && GTMThreads->gt_threads[ii] != my_threadinfo) + (process_routine)(GTMThreads->gt_threads[ii]); + } +} + diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c index 48dcf6fbf8..7062c31f6a 100644 --- a/src/gtm/main/main.c +++ b/src/gtm/main/main.c @@ -126,7 +126,6 @@ MainThreadInit() * use malloc */ thrinfo = (GTM_ThreadInfo *)malloc(sizeof (GTM_ThreadInfo)); - memset(thrinfo, 0, sizeof(GTM_ThreadInfo)); if (thrinfo == NULL) { @@ -136,6 +135,12 @@ MainThreadInit() exit(1); } + memset(thrinfo, 0, sizeof(GTM_ThreadInfo)); + + thrinfo->is_main_thread = true; + GTM_RWLockInit(&thrinfo->thr_lock); + GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE); + if (SetMyThreadInfo(thrinfo)) { fprintf(stderr, "SetMyThreadInfo failed: %d", errno); @@ -407,7 +412,7 @@ main(int argc, char *argv[]) * Setup work directory */ if (data_dir) - SetConfigOption("data_dir", data_dir, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_DATA_DIR, data_dir, GTMC_STARTUP, GTMC_S_OVERRIDE); /* * Setup configuration file @@ -424,43 +429,44 @@ main(int argc, char *argv[]) */ if (log_file) { - SetConfigOption("log_file", log_file, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_LOG_FILE, log_file, GTMC_STARTUP, GTMC_S_OVERRIDE); free(log_file); log_file = NULL; } if (listen_addresses) { - SetConfigOption("listen_addreses", listen_addresses, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_LISTEN_ADDRESSES, + listen_addresses, GTMC_STARTUP, GTMC_S_OVERRIDE); free(listen_addresses); listen_addresses = NULL; } if (node_name) { - SetConfigOption("nodename", node_name, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_NODENAME, node_name, GTMC_STARTUP, GTMC_S_OVERRIDE); free(node_name); node_name = NULL; } if (port_number) { - SetConfigOption("port", port_number, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_PORT, port_number, GTMC_STARTUP, GTMC_S_OVERRIDE); free(port_number); port_number = NULL; } if (is_standby_mode) { - SetConfigOption("startup", is_standby_mode, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_STARTUP, is_standby_mode, GTMC_STARTUP, GTMC_S_OVERRIDE); free(is_standby_mode); is_standby_mode = NULL; } if (dest_addr) { - SetConfigOption("active_host", dest_addr, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_ACTIVE_HOST, dest_addr, GTMC_STARTUP, GTMC_S_OVERRIDE); free(dest_addr); dest_addr = NULL; } if (dest_port) { - SetConfigOption("active_port", dest_port, GTMC_STARTUP, GTMC_S_OVERRIDE); + SetConfigOption(GTM_OPTNAME_ACTIVE_PORT, dest_port, GTMC_STARTUP, GTMC_S_OVERRIDE); free(dest_port); dest_port = NULL; } @@ -484,6 +490,23 @@ main(int argc, char *argv[]) progname); exit(1); } + + if (NodeName == NULL || NodeName[0] == 0) + { + write_stderr("Nodename must be specified\n"); + write_stderr("Try \"%s --help\" for more information.\n", + progname); + exit(1); + } + + if (ListenAddresses == NULL || ListenAddresses[0] == 0) + { + write_stderr("Listen_addresses must be specified\n"); + write_stderr("Try \"%s --help\" for more information.\n", + progname); + exit(1); + } + /* * GTM accepts no non-option switch arguments. */ @@ -522,6 +545,11 @@ main(int argc, char *argv[]) */ if (Recovery_IsStandby()) { + if (!gtm_standby_begin_backup()) + { + elog(ERROR, "Failed to set begin_backup satatus to the active-GTM."); + exit(1); + } if (!gtm_standby_restore_next_gxid()) { elog(ERROR, "Failed to restore next/last gxid from the active-GTM."); @@ -570,11 +598,19 @@ main(int argc, char *argv[]) exit(1); } elog(LOG, "Restoring node information from the active-GTM succeeded."); + + if (!gtm_standby_end_backup()) + { + elog(ERROR, "Failed to setup normal standby mode to the active-GTM."); + exit(1); + } + elog(LOG, "Started to run as GTM-Standby."); } else { /* Recover Data of Registered nodes. */ Recovery_RestoreRegisterInfo(); + elog(LOG, "Started to run as GTM-Active."); } /* @@ -729,6 +765,7 @@ ServerLoop(void) * gracefully */ + elog(LOG, "GTM shutting down."); /* * Tell GTM that we are shutting down so that no new GXIDs are * issued this point onwards @@ -746,6 +783,20 @@ ServerLoop(void) GTM_SaveTxnInfo(ctlfd); GTM_SaveSeqInfo(ctlfd); +#if 0 + /* + * This causes another problem. + * Active GTM tries to establish a connection to the standby, + * causing deadlock. + * + * Maybe before this, we should close listening port. + */ + if (Recovery_IsStandby()) + { + gtm_standby_finishActiveConn(); + } +#endif + close(ctlfd); exit(1); @@ -754,11 +805,23 @@ ServerLoop(void) { /* must set timeout each time; some OSes change it! */ struct timeval timeout; + GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo; timeout.tv_sec = 60; timeout.tv_usec = 0; + /* + * Now GTM-Standby can backup current status during this region + */ + GTM_RWLockRelease(&my_threadinfo->thr_lock); + selres = select(nSockets, &rmask, NULL, NULL, &timeout); + + /* + * Prohibit GTM-Standby backup from here. + */ + GTM_RWLockAcquire(&my_threadinfo->thr_lock, GTM_LOCKMODE_WRITE); + } /* @@ -872,7 +935,12 @@ GTM_ThreadMain(void *argp) ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, false); - + + /* + * Acquire the thread lock to prevent connection from GTM-Standby to update + * GTM-Standby registration. + */ + GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE); { /* @@ -995,10 +1063,37 @@ GTM_ThreadMain(void *argp) resetStringInfo(&input_message); /* + * GTM-Standby registration information can be updated during ReadCommand + * operation. + */ + GTM_RWLockRelease(&thrinfo->thr_lock); + /* * (3) read a command (loop blocks here) */ qtype = ReadCommand(thrinfo->thr_conn->con_port, &input_message); + GTM_RWLockAcquire(&thrinfo->thr_lock, GTM_LOCKMODE_WRITE); + /* + * Check if GTM Standby info is upadted + * Maybe the following lines can be a separate function. At present, this is done only here so + * I'll leave them here. K.Suzuki, Nov.29, 2011 + * Please note that we don't check if it is not in the standby mode to allow cascased standby. + */ + if (GTMThreads->gt_standby_ready && thrinfo->thr_conn->standby == NULL) + { + /* Connect to GTM-Standby */ + thrinfo->thr_conn->standby = gtm_standby_connect_to_standby(); + if (thrinfo->thr_conn->standby == NULL) + GTMThreads->gt_standby_ready = false; /* This will make other threads to disconnect from + * the standby, if needed.*/ + } + else if (GTMThreads->gt_standby_ready == false && thrinfo->thr_conn->standby) + { + /* Disconnect from GTM-Standby */ + gtm_standby_disconnect_from_standby(thrinfo->thr_conn->standby); + thrinfo->thr_conn->standby = NULL; + } + switch(qtype) { case 'C': @@ -1073,7 +1168,11 @@ ProcessCommand(Port *myport, StringInfo input_message) case MSG_NODE_LIST: ProcessPGXCNodeCommand(myport, mtype, input_message); break; - + case MSG_BEGIN_BACKUP: + ProcessGTMBeginBackup(myport, input_message); + break; + case MSG_END_BACKUP: + ProcessGTMEndBackup(myport, input_message); case MSG_NODE_BEGIN_REPLICATION_INIT: case MSG_NODE_END_REPLICATION_INIT: case MSG_TXN_BEGIN: diff --git a/src/gtm/proxy/Makefile b/src/gtm/proxy/Makefile index 4eeb482425..b2faab35eb 100644 --- a/src/gtm/proxy/Makefile +++ b/src/gtm/proxy/Makefile @@ -3,14 +3,16 @@ top_builddir=../../.. include $(top_builddir)/src/Makefile.global -OBJS=proxy_main.o proxy_thread.o proxy_utils.o gtm_proxy_opt.o ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a +OBJS=proxy_main.o proxy_thread.o proxy_utils.o gtm_proxy_opt.o + +OTHERS= ../libpq/libpqcomm.a ../path/libgtmpath.a ../recovery/libgtmrecovery.a ../client/libgtmclient.a ../common/libgtm.a LDFLAGS=-L$(top_builddir)/common -L$(top_builddir)/libpq LIBS=-lpthread gtm_proxy:$(OBJS) - $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ ../../port/libpgport_srv.a -o gtm_proxy + $(CC) $(CFLAGS) $(LDFLAGS) $(LIBS) $^ $(OTHERS) ../../port/libpgport_srv.a -o gtm_proxy all:gtm_proxy diff --git a/src/gtm/proxy/gtm_proxy_opt.c b/src/gtm/proxy/gtm_proxy_opt.c index 0492aaece1..8048f39896 100644 --- a/src/gtm/proxy/gtm_proxy_opt.c +++ b/src/gtm/proxy/gtm_proxy_opt.c @@ -136,13 +136,18 @@ Config_Type_Names(); * it is not single quoted at dump time. */ +/* + * Definition of option name strings are given in gtm_opt.h, both for gtm.conf + * and gtm_proxy.conf. They will be used in command line option handling too. + */ + /******** option records follow ********/ struct config_bool ConfigureNamesBool[] = { { - {"err_wait_opt", GTMC_SIGHUP, + {GTM_OPTNAME_ERR_WAIT_OPT, GTMC_SIGHUP, gettext_noop("If GTM_Proxy waits for reconnect when GTM communication error is encountered."), NULL, 0 @@ -160,7 +165,7 @@ struct config_bool ConfigureNamesBool[] = struct config_int ConfigureNamesInt[] = { { - {"port", GTMC_STARTUP, + {GTM_OPTNAME_PORT, GTMC_STARTUP, gettext_noop("Listen Port of GTM_Proxy server."), NULL, 0 @@ -170,7 +175,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"gtm_port", GTMC_SIGHUP, + {GTM_OPTNAME_GTM_PORT, GTMC_SIGHUP, gettext_noop("GTM server port number."), NULL, 0 @@ -180,7 +185,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_idle", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_IDLE, GTMC_STARTUP, gettext_noop("Sets \"keepalives_idle\" option for the connection to GTM."), NULL, GTMOPT_UNIT_TIME @@ -190,7 +195,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_interval", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_INTERVAL, GTMC_STARTUP, gettext_noop("Sets \"keepalives_interval\" option fo the connetion to GTM."), NULL, GTMOPT_UNIT_TIME @@ -200,7 +205,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"keepalives_count", GTMC_STARTUP, + {GTM_OPTNAME_KEEPALIVES_COUNT, GTMC_STARTUP, gettext_noop("Sets \"keepalives_count\" option to the connection to GTM."), NULL, 0 @@ -210,7 +215,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"err_wait_interval", GTMC_SIGHUP, + {GTM_OPTNAME_ERR_WAIT_INTERVAL, GTMC_SIGHUP, gettext_noop("Wait interval to wait for reconnect."), gettext_noop("This parameter determines GTM Proxy behavior when GTM communication error is encountered."), 0 @@ -220,7 +225,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"err_wait_count", GTMC_SIGHUP, + {GTM_OPTNAME_ERR_WAIT_COUNT, GTMC_SIGHUP, gettext_noop("Number of err_wait_interval to wait for reconnect."), gettext_noop("This parameter determines GTM Prox behavior when GTM communication error is encountered."), 0 @@ -230,7 +235,7 @@ struct config_int ConfigureNamesInt[] = 0, NULL }, { - {"worker_threads", GTMC_STARTUP, + {GTM_OPTNAME_WORKER_THREADS, GTMC_STARTUP, gettext_noop("Number of worker thread."), NULL, 0 @@ -257,7 +262,7 @@ struct config_real ConfigureNamesReal[] = struct config_string ConfigureNamesString[] = { { - {"data_dir", GTMC_STARTUP, + {GTM_OPTNAME_DATA_DIR, GTMC_STARTUP, gettext_noop("Work directory."), NULL, 0 @@ -269,7 +274,7 @@ struct config_string ConfigureNamesString[] = }, { - {"config_file", GTMC_SIGHUP, + {GTM_OPTNAME_CONFIG_FILE, GTMC_SIGHUP, gettext_noop("Configuration file name."), NULL, 0 @@ -281,7 +286,7 @@ struct config_string ConfigureNamesString[] = }, { - {"listen_addresses", GTMC_STARTUP, + {GTM_OPTNAME_LISTEN_ADDRESSES, GTMC_STARTUP, gettext_noop("Listen address."), NULL, 0 @@ -292,7 +297,7 @@ struct config_string ConfigureNamesString[] = }, { - {"nodename", GTMC_STARTUP, + {GTM_OPTNAME_NODENAME, GTMC_STARTUP, gettext_noop("My node name."), NULL, 0, @@ -303,7 +308,7 @@ struct config_string ConfigureNamesString[] = }, { - {"gtm_host", GTMC_SIGHUP, + {GTM_OPTNAME_GTM_HOST, GTMC_SIGHUP, gettext_noop("Address of target GTM ACT."), NULL, 0 @@ -314,7 +319,7 @@ struct config_string ConfigureNamesString[] = }, { - {"log_file", GTMC_SIGHUP, + {GTM_OPTNAME_LOG_FILE, GTMC_SIGHUP, gettext_noop("Log file name."), NULL, 0 @@ -325,7 +330,7 @@ struct config_string ConfigureNamesString[] = }, { - {"error_reporter", GTMC_SIGHUP, + {GTM_OPTNAME_ERROR_REPORTER, GTMC_SIGHUP, gettext_noop("Command to report various errors."), NULL, 0 @@ -336,7 +341,7 @@ struct config_string ConfigureNamesString[] = }, { - {"status_reader", GTMC_SIGHUP, + {GTM_OPTNAME_STATUS_READER, GTMC_SIGHUP, gettext_noop("Command to get status of global XC node status."), gettext_noop("Runs when configuration file is read by SIGHUP"), 0 @@ -356,7 +361,7 @@ struct config_string ConfigureNamesString[] = struct config_enum ConfigureNamesEnum[] = { { - {"log_min_messages", GTMC_SIGHUP, + {GTM_OPTNAME_LOG_MIN_MESSAGES, GTMC_SIGHUP, gettext_noop("Minimum message level to write to the log file."), NULL, 0 diff --git a/src/gtm/recovery/Makefile b/src/gtm/recovery/Makefile index 2cceb05695..6dd88a1545 100644 --- a/src/gtm/recovery/Makefile +++ b/src/gtm/recovery/Makefile @@ -8,7 +8,9 @@ NAME=gtmrecovery SO_MAJOR_VERSION= 1 SO_MINOR_VERSION= 0 -OBJS=register.o replication.o standby_utils.o ../client/libgtmclient.a +OBJS=register_common.o register_gtm.o replication.o standby_utils.o + +OTHERS=../client/libgtmclient.a all:all-lib diff --git a/src/gtm/recovery/register.c b/src/gtm/recovery/register_common.c index 5c779391ca..cefcbbe3ee 100644 --- a/src/gtm/recovery/register.c +++ b/src/gtm/recovery/register_common.c @@ -28,6 +28,7 @@ #include "gtm/libpq-int.h" #include "gtm/pqformat.h" #include "gtm/stringinfo.h" +#include "gtm/register.h" #include "gtm/gtm_ip.h" @@ -415,320 +416,7 @@ Recovery_PGXCNodeRegister(GTM_PGXCNodeType type, } -/* - * Process MSG_NODE_REGISTER - */ -void -ProcessPGXCNodeRegister(Port *myport, StringInfo message) -{ - GTM_PGXCNodeType type; - GTM_PGXCNodePort port; - char remote_host[NI_MAXHOST]; - char datafolder[NI_MAXHOST]; - char node_name[NI_MAXHOST]; - char proxyname[NI_MAXHOST]; - char *ipaddress; - MemoryContext oldContext; - int len; - StringInfoData buf; - GTM_PGXCNodeStatus status; - - /* Read Node Type */ - memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), - sizeof (GTM_PGXCNodeType)); - - /* Read Node name */ - len = pq_getmsgint(message, sizeof (int)); - if (len >= NI_MAXHOST) - ereport(ERROR, - (EINVAL, - errmsg("Invalid name length."))); - - memcpy(node_name, (char *)pq_getmsgbytes(message, len), len); - node_name[len] = '\0'; - - /* Read Host name */ - len = pq_getmsgint(message, sizeof (int)); - memcpy(remote_host, (char *)pq_getmsgbytes(message, len), len); - remote_host[len] = '\0'; - ipaddress = remote_host; - - /* Read Port Number */ - memcpy(&port, pq_getmsgbytes(message, sizeof (GTM_PGXCNodePort)), - sizeof (GTM_PGXCNodePort)); - - /* Read Proxy name (empty string if no proxy used) */ - len = pq_getmsgint(message, sizeof (GTM_StrLen)); - if (len >= NI_MAXHOST) - ereport(ERROR, - (EINVAL, - errmsg("Invalid proxy name length."))); - memcpy(proxyname, (char *)pq_getmsgbytes(message, len), len); - proxyname[len] = '\0'; - - /* - * Finish by reading Data Folder (length and then string) - */ - len = pq_getmsgint(message, sizeof (GTM_StrLen)); - - memcpy(datafolder, (char *)pq_getmsgbytes(message, len), len); - datafolder[len] = '\0'; - - elog(LOG, - "ProcessPGXCNodeRegister: ipaddress = \"%s\", node name = \"%s\", proxy name = \"%s\", " - "datafolder \"%s\"", - ipaddress, node_name, proxyname, datafolder); - - status = pq_getmsgint(message, sizeof (GTM_PGXCNodeStatus)); - - if ((type!=GTM_NODE_GTM_PROXY) && - (type!=GTM_NODE_GTM_PROXY_POSTMASTER) && - (type!=GTM_NODE_COORDINATOR) && - (type!=GTM_NODE_DATANODE) && - (type!=GTM_NODE_GTM) && - (type!=GTM_NODE_DEFAULT)) - ereport(ERROR, - (EINVAL, - errmsg("Unknown node type."))); - elog(LOG, "Node type = %d", type); - - /* - * We must use the TopMostMemoryContext because the Node ID information is - * not bound to a thread and can outlive any of the thread specific - * contextes. - */ - oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - - if (Recovery_PGXCNodeRegister(type, node_name, port, - proxyname, NODE_CONNECTED, - ipaddress, datafolder, false, myport->sock)) - { - ereport(ERROR, - (EINVAL, - errmsg("Failed to Register node"))); - } - - MemoryContextSwitchTo(oldContext); - - pq_getmsgend(message); - - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, NODE_REGISTER_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); - /* Node name length */ - pq_sendint(&buf, strlen(node_name), 4); - /* Node name (var-len) */ - pq_sendbytes(&buf, node_name, strlen(node_name)); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - - elog(LOG, "calling node_register_internal() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); - -retry: - _rc = node_register_internal(GetMyThreadInfo->thr_conn->standby, - type, - ipaddress, - port, - node_name, - datafolder, - status); - - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; - - elog(LOG, "node_register_internal() returns rc %d.", _rc); - } -} - -/* - * Process MSG_NODE_UNREGISTER - */ -void -ProcessPGXCNodeUnregister(Port *myport, StringInfo message) -{ - GTM_PGXCNodeType type; - MemoryContext oldContext; - StringInfoData buf; - int len; - char node_name[NI_MAXHOST]; - - /* Read Node Type and number */ - memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), - sizeof (GTM_PGXCNodeType)); - - /* Read Node name */ - len = pq_getmsgint(message, sizeof (int)); - if (len >= NI_MAXHOST) - ereport(ERROR, - (EINVAL, - errmsg("Invalid node name length"))); - memcpy(node_name, (char *)pq_getmsgbytes(message, len), len); - node_name[len] = '\0'; - - /* - * We must use the TopMostMemoryContext because the Node ID information is - * not bound to a thread and can outlive any of the thread specific - * contextes. - */ - oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - - if (Recovery_PGXCNodeUnregister(type, node_name, false, myport->sock)) - { - ereport(ERROR, - (EINVAL, - errmsg("Failed to Unregister node"))); - } - - MemoryContextSwitchTo(oldContext); - - pq_getmsgend(message); - - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); - /* Node name length */ - pq_sendint(&buf, strlen(node_name), 4); - /* Node name (var-len) */ - pq_sendbytes(&buf, node_name, strlen(node_name)); - - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - - elog(LOG, "calling node_unregister() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); - -retry: - _rc = node_unregister(GetMyThreadInfo->thr_conn->standby, - type, - node_name); - - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; - - elog(LOG, "node_unregister() returns rc %d.", _rc); - } -} - -/* - * Process MSG_NODE_LIST - */ -void -ProcessPGXCNodeList(Port *myport, StringInfo message) -{ - MemoryContext oldContext; - StringInfoData buf; - int num_node = 13; - int i; - - GTM_PGXCNodeInfo *data[MAX_NODES]; - char *s_data[MAX_NODES]; - size_t s_datalen[MAX_NODES]; - - /* - * We must use the TopMostMemoryContext because the Node ID information is - * not bound to a thread and can outlive any of the thread specific - * contextes. - */ - oldContext = MemoryContextSwitchTo(TopMostMemoryContext); - - memset(data, 0, sizeof(GTM_PGXCNodeInfo *) * MAX_NODES); - memset(s_data, 0, sizeof(char *) * MAX_NODES); - - num_node = pgxcnode_get_all(data, MAX_NODES); - - for (i = 0; i < num_node; i++) - { - size_t s_len; - - s_len = gtm_get_pgxcnodeinfo_size(data[i]); - - /* - * Allocate memory blocks for serialized GTM_PGXCNodeInfo data. - */ - s_data[i] = (char *)malloc(s_len+1); - memset(s_data[i], 0, s_len+1); - - s_datalen[i] = gtm_serialize_pgxcnodeinfo(data[i], s_data[i], s_len+1); - - elog(LOG, "gtm_get_pgxcnodeinfo_size: s_len=%ld, s_datalen=%ld", s_len, s_datalen[i]); - } - - MemoryContextSwitchTo(oldContext); - - pq_getmsgend(message); - - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, NODE_LIST_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, num_node, sizeof(int)); /* number of nodes */ - - /* - * Send pairs of GTM_PGXCNodeInfo size and serialized GTM_PGXCNodeInfo body. - */ - for (i = 0; i < num_node; i++) - { - pq_sendint(&buf, s_datalen[i], sizeof(int)); - pq_sendbytes(&buf, s_data[i], s_datalen[i]); - } - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - /* - * Release memory blocks for the serialized data. - */ - for (i = 0; i < num_node; i++) - { - free(s_data[i]); - } - - elog(LOG, "ProcessPGXCNodeList() ok."); -} /* * Called at GTM shutdown, rewrite on disk register information @@ -1174,3 +862,4 @@ retry: elog(LOG, "MSG_BACKEND_DISCONNECT rc=%d done.", _rc); } } + diff --git a/src/gtm/recovery/register_gtm.c b/src/gtm/recovery/register_gtm.c new file mode 100644 index 0000000000..7f9b1543e4 --- /dev/null +++ b/src/gtm/recovery/register_gtm.c @@ -0,0 +1,453 @@ +/*------------------------------------------------------------------------- + * + * register.c + * PGXC Node Register on GTM and GTM Proxy, node registering functions + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010-2011 Nippon Telegraph and Telephone Corporation + * + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + +#include <fcntl.h> +#include <stdio.h> +#include <sys/stat.h> +#include <unistd.h> + +#include "gtm/elog.h" +#include "gtm/gtm.h" +#include "gtm/gtm_client.h" +#include "gtm/gtm_serialize.h" +#include "gtm/gtm_standby.h" +#include "gtm/libpq.h" +#include "gtm/libpq-int.h" +#include "gtm/pqformat.h" +#include "gtm/stringinfo.h" +#include "gtm/register.h" + +#include "gtm/gtm_ip.h" + +static void finishStandbyConn(GTM_ThreadInfo *thrinfo); + +/* + * Process MSG_NODE_REGISTER + */ +void +ProcessPGXCNodeRegister(Port *myport, StringInfo message) +{ + GTM_PGXCNodeType type; + GTM_PGXCNodePort port; + char remote_host[NI_MAXHOST]; + char datafolder[NI_MAXHOST]; + char node_name[NI_MAXHOST]; + char proxyname[NI_MAXHOST]; + char *ipaddress; + MemoryContext oldContext; + int len; + StringInfoData buf; + GTM_PGXCNodeStatus status; + + /* Read Node Type */ + memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), + sizeof (GTM_PGXCNodeType)); + + /* Read Node name */ + len = pq_getmsgint(message, sizeof (int)); + if (len >= NI_MAXHOST) + ereport(ERROR, + (EINVAL, + errmsg("Invalid name length."))); + + memcpy(node_name, (char *)pq_getmsgbytes(message, len), len); + node_name[len] = '\0'; + + /* Read Host name */ + len = pq_getmsgint(message, sizeof (int)); + memcpy(remote_host, (char *)pq_getmsgbytes(message, len), len); + remote_host[len] = '\0'; + ipaddress = remote_host; + + /* Read Port Number */ + memcpy(&port, pq_getmsgbytes(message, sizeof (GTM_PGXCNodePort)), + sizeof (GTM_PGXCNodePort)); + + /* Read Proxy name (empty string if no proxy used) */ + len = pq_getmsgint(message, sizeof (GTM_StrLen)); + if (len >= NI_MAXHOST) + ereport(ERROR, + (EINVAL, + errmsg("Invalid proxy name length."))); + memcpy(proxyname, (char *)pq_getmsgbytes(message, len), len); + proxyname[len] = '\0'; + + /* + * Finish by reading Data Folder (length and then string) + */ + len = pq_getmsgint(message, sizeof (GTM_StrLen)); + + memcpy(datafolder, (char *)pq_getmsgbytes(message, len), len); + datafolder[len] = '\0'; + + elog(LOG, + "ProcessPGXCNodeRegister: ipaddress = \"%s\", node name = \"%s\", proxy name = \"%s\", " + "datafolder \"%s\"", + ipaddress, node_name, proxyname, datafolder); + + status = pq_getmsgint(message, sizeof (GTM_PGXCNodeStatus)); + + if ((type!=GTM_NODE_GTM_PROXY) && + (type!=GTM_NODE_GTM_PROXY_POSTMASTER) && + (type!=GTM_NODE_COORDINATOR) && + (type!=GTM_NODE_DATANODE) && + (type!=GTM_NODE_GTM) && + (type!=GTM_NODE_DEFAULT)) + ereport(ERROR, + (EINVAL, + errmsg("Unknown node type."))); + + elog(LOG, "Node type = %d", type); + + /* + * We must use the TopMostMemoryContext because the Node ID information is + * not bound to a thread and can outlive any of the thread specific + * contextes. + */ + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + + /* + * We don't check if the this is not in standby mode to allow + * cascaded standby. + */ + if (type == GTM_NODE_GTM) + { + elog(DEBUG1, "Registering GTM (Standby). Unregister this first."); + /* + * There's another standby. May be failed one. + * Clean this up. This means that we allow + * only one standby at the same time. + * + * This helps to give up failed standby and connect + * new one, regardless how they stopped. + * + * Be sure that all ther threads are locked by other + * means, typically by receiving MSG_BEGIN_BACKUP. + */ + /* + * First try to unregister GTM which is now connected. We don't care + * if it failed. + */ + Recovery_PGXCNodeUnregister(type, node_name, false, -1); + /* + * Then disconnect the connections to the standby from each thread. + * Please note that we assume only one standby is allowed at the same time. + * Cascade standby may be allowed. + */ + GTM_DoForAllOtherThreads(finishStandbyConn); + + GTMThreads->gt_standby_ready = true; + } + + if (Recovery_PGXCNodeRegister(type, node_name, port, + proxyname, NODE_CONNECTED, + ipaddress, datafolder, false, myport->sock)) + { + ereport(ERROR, + (EINVAL, + errmsg("Failed to Register node"))); + } + + /* + * We don't check if the this is not in standby mode to allow + * cascaded standby. + */ + if (type == GTM_NODE_GTM) + GTMThreads->gt_standby_ready = true; + + MemoryContextSwitchTo(oldContext); + + pq_getmsgend(message); + + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, NODE_REGISTER_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); + /* Node name length */ + pq_sendint(&buf, strlen(node_name), 4); + /* Node name (var-len) */ + pq_sendbytes(&buf, node_name, strlen(node_name)); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + pq_flush(myport); + + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + GTM_PGXCNodeInfo *standbynode; + + elog(LOG, "calling node_register_internal() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); + +retry: + _rc = node_register_internal(GetMyThreadInfo->thr_conn->standby, + type, + ipaddress, + port, + node_name, + datafolder, + status); + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + /* Now check if there're other standby registered. */ + standbynode = find_standby_node_info(); + if (!standbynode) + GTMThreads->gt_standby_ready = false; + + elog(LOG, "node_register_internal() returns rc %d.", _rc); + } +} + + +/* + * Process MSG_NODE_UNREGISTER + */ +void +ProcessPGXCNodeUnregister(Port *myport, StringInfo message) +{ + GTM_PGXCNodeType type; + MemoryContext oldContext; + StringInfoData buf; + int len; + char node_name[NI_MAXHOST]; + + /* Read Node Type and number */ + memcpy(&type, pq_getmsgbytes(message, sizeof (GTM_PGXCNodeType)), + sizeof (GTM_PGXCNodeType)); + + /* Read Node name */ + len = pq_getmsgint(message, sizeof (int)); + if (len >= NI_MAXHOST) + ereport(ERROR, + (EINVAL, + errmsg("Invalid node name length"))); + memcpy(node_name, (char *)pq_getmsgbytes(message, len), len); + node_name[len] = '\0'; + + /* + * We must use the TopMostMemoryContext because the Node ID information is + * not bound to a thread and can outlive any of the thread specific + * contextes. + */ + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + + if (Recovery_PGXCNodeUnregister(type, node_name, false, myport->sock)) + { + ereport(ERROR, + (EINVAL, + errmsg("Failed to Unregister node"))); + } + + MemoryContextSwitchTo(oldContext); + + pq_getmsgend(message); + + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); + /* Node name length */ + pq_sendint(&buf, strlen(node_name), 4); + /* Node name (var-len) */ + pq_sendbytes(&buf, node_name, strlen(node_name)); + + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + pq_flush(myport); + + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + + elog(LOG, "calling node_unregister() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); + +retry: + _rc = node_unregister(GetMyThreadInfo->thr_conn->standby, + type, + node_name); + + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + elog(LOG, "node_unregister() returns rc %d.", _rc); + } +} + +/* + * Process MSG_NODE_LIST + */ +void +ProcessPGXCNodeList(Port *myport, StringInfo message) +{ + MemoryContext oldContext; + StringInfoData buf; + int num_node = 13; + int i; + + GTM_PGXCNodeInfo *data[MAX_NODES]; + char *s_data[MAX_NODES]; + size_t s_datalen[MAX_NODES]; + + /* + * We must use the TopMostMemoryContext because the Node ID information is + * not bound to a thread and can outlive any of the thread specific + * contextes. + */ + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + + memset(data, 0, sizeof(GTM_PGXCNodeInfo *) * MAX_NODES); + memset(s_data, 0, sizeof(char *) * MAX_NODES); + + num_node = pgxcnode_get_all(data, MAX_NODES); + + for (i = 0; i < num_node; i++) + { + size_t s_len; + + s_len = gtm_get_pgxcnodeinfo_size(data[i]); + + /* + * Allocate memory blocks for serialized GTM_PGXCNodeInfo data. + */ + s_data[i] = (char *)malloc(s_len+1); + memset(s_data[i], 0, s_len+1); + + s_datalen[i] = gtm_serialize_pgxcnodeinfo(data[i], s_data[i], s_len+1); + + elog(LOG, "gtm_get_pgxcnodeinfo_size: s_len=%ld, s_datalen=%ld", s_len, s_datalen[i]); + } + + MemoryContextSwitchTo(oldContext); + + pq_getmsgend(message); + + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, NODE_LIST_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, num_node, sizeof(int)); /* number of nodes */ + + /* + * Send pairs of GTM_PGXCNodeInfo size and serialized GTM_PGXCNodeInfo body. + */ + for (i = 0; i < num_node; i++) + { + pq_sendint(&buf, s_datalen[i], sizeof(int)); + pq_sendbytes(&buf, s_data[i], s_datalen[i]); + } + + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + pq_flush(myport); + + /* + * Release memory blocks for the serialized data. + */ + for (i = 0; i < num_node; i++) + { + free(s_data[i]); + } + + elog(LOG, "ProcessPGXCNodeList() ok."); +} + +void +ProcessGTMBeginBackup(Port *myport, StringInfo message) +{ + int ii; + GTM_ThreadInfo *my_threadinfo; + StringInfoData buf; + + pq_getmsgend(message); + my_threadinfo = GetMyThreadInfo; + + for (ii = 0; ii < GTMThreads->gt_array_size; ii++) + { + if (GTMThreads->gt_threads[ii] && GTMThreads->gt_threads[ii] != my_threadinfo) + GTM_RWLockAcquire(>MThreads->gt_threads[ii]->thr_lock, GTM_LOCKMODE_WRITE); + } + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, BEGIN_BACKUP_RESULT, 4); + pq_endmessage(myport, &buf); + pq_flush(myport); +} + +void +ProcessGTMEndBackup(Port *myport, StringInfo message) +{ + int ii; + GTM_ThreadInfo *my_threadinfo; + StringInfoData buf; + + pq_getmsgend(message); + my_threadinfo = GetMyThreadInfo; + + for (ii = 0; ii < GTMThreads->gt_array_size; ii++) + { + if (GTMThreads->gt_threads[ii] && GTMThreads->gt_threads[ii] != my_threadinfo) + GTM_RWLockRelease(>MThreads->gt_threads[ii]->thr_lock); + } + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, END_BACKUP_RESULT, 4); + pq_endmessage(myport, &buf); + pq_flush(myport); +} + + +static void +finishStandbyConn(GTM_ThreadInfo *thrinfo) +{ + if ((thrinfo->thr_conn != NULL) && (thrinfo->thr_conn->standby != NULL)) + { + GTMPQfinish(thrinfo->thr_conn->standby); + thrinfo->thr_conn->standby = NULL; + } +} + diff --git a/src/include/gen_alloc.h b/src/include/gen_alloc.h index 1229ba524d..b676006bf5 100644 --- a/src/include/gen_alloc.h +++ b/src/include/gen_alloc.h @@ -26,13 +26,15 @@ typedef struct Gen_Alloc void * (* realloc) (void *, size_t); void (* free) (void *); void * (* current_memcontext) (void); + void * (* allocTop) (size_t); } Gen_Alloc; extern Gen_Alloc genAlloc_class; #define genAlloc(x) genAlloc_class.alloc(genAlloc_class.current_memcontext(), x) -#define genRealloc(x, y) genAlloc_class.realloc(x, y); -#define genFree(x) genAlloc_class.free(x); +#define genRealloc(x, y) genAlloc_class.realloc(x, y) +#define genFree(x) genAlloc_class.free(x) #define genAlloc0(x) genAlloc_class.alloc0(genAlloc_class.current_memcontext(), x) +#define genAllocTop(x) genAlloc_class.allocTop(x) #endif /* GEN_ALLOC_H */ diff --git a/src/include/gtm/gtm.h b/src/include/gtm/gtm.h index ab970fb925..4e49b22f10 100644 --- a/src/include/gtm/gtm.h +++ b/src/include/gtm/gtm.h @@ -45,6 +45,7 @@ typedef struct GTM_ThreadInfo */ GTM_ThreadID thr_id; uint32 thr_localid; + bool is_main_thread; void * (* thr_startroutine)(void *); MemoryContext thr_thread_context; @@ -72,6 +73,7 @@ typedef struct GTM_Threads { uint32 gt_thread_count; uint32 gt_array_size; + bool gt_standby_ready; GTM_ThreadInfo **gt_threads; GTM_RWLock gt_lock; } GTM_Threads; @@ -83,6 +85,9 @@ int GTM_ThreadRemove(GTM_ThreadInfo *thrinfo); int GTM_ThreadJoin(GTM_ThreadInfo *thrinfo); void GTM_ThreadExit(void); void ConnFree(Port *port); +void GTM_LockAllOtherThreads(void); +void GTM_UnlockAllOtherThreads(void); +void GTM_DoForAllOtherThreads(void (* process_routine)(GTM_ThreadInfo *)); GTM_ThreadInfo *GTM_ThreadCreate(GTM_ConnectionInfo *conninfo, void *(* startroutine)(void *)); diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h index 5e108db50b..a9a5a6a95b 100644 --- a/src/include/gtm/gtm_c.h +++ b/src/include/gtm/gtm_c.h @@ -48,12 +48,13 @@ typedef uint32 GTM_PGXCNodePort; /* Possible type of nodes for registration */ typedef enum GTM_PGXCNodeType { - GTM_NODE_GTM_PROXY, - GTM_NODE_GTM_PROXY_POSTMASTER, /* Used by Proxy to communicate with GTM and not use Proxy headers */ - GTM_NODE_COORDINATOR, - GTM_NODE_DATANODE, - GTM_NODE_GTM, - GTM_NODE_DEFAULT /* In case nothing is associated to connection */ + GTM_NODE_GTM_PROXY = 1, + GTM_NODE_GTM_PROXY_POSTMASTER = 2, + /* Used by Proxy to communicate with GTM and not use Proxy headers */ + GTM_NODE_COORDINATOR = 3, + GTM_NODE_DATANODE = 4, + GTM_NODE_GTM = 5, + GTM_NODE_DEFAULT = 6 /* In case nothing is associated to connection */ } GTM_PGXCNodeType; /* diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h index 0c278145ac..9d3c5b06b1 100644 --- a/src/include/gtm/gtm_client.h +++ b/src/include/gtm/gtm_client.h @@ -233,4 +233,10 @@ GTM_Sequence get_next(GTM_Conn *conn, GTM_SequenceKey key); int set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool is_called); int reset_sequence(GTM_Conn *conn, GTM_SequenceKey key); +/* + * GTM-Standby + */ +int set_begin_end_backup(GTM_Conn *conn, bool begin); + + #endif diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h index 4ec7dacb28..c7c1259eba 100644 --- a/src/include/gtm/gtm_msg.h +++ b/src/include/gtm/gtm_msg.h @@ -22,6 +22,8 @@ typedef enum GTM_MessageType MSG_NODE_LIST, MSG_NODE_BEGIN_REPLICATION_INIT, MSG_NODE_END_REPLICATION_INIT, + MSG_BEGIN_BACKUP, /* Start backup by Standby */ + MSG_END_BACKUP, /* End backup preparation by Standby */ MSG_TXN_BEGIN, /* Start a new transaction */ MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */ MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */ @@ -69,6 +71,8 @@ typedef enum GTM_ResultType NODE_LIST_RESULT, NODE_BEGIN_REPLICATION_INIT_RESULT, NODE_END_REPLICATION_INIT_RESULT, + BEGIN_BACKUP_RESULT, + END_BACKUP_RESULT, TXN_BEGIN_RESULT, TXN_BEGIN_GETGXID_RESULT, TXN_BEGIN_GETGXID_MULTI_RESULT, diff --git a/src/include/gtm/gtm_opt.h b/src/include/gtm/gtm_opt.h index 7d0de94fd6..704950e5ea 100644 --- a/src/include/gtm/gtm_opt.h +++ b/src/include/gtm/gtm_opt.h @@ -322,7 +322,33 @@ const char *const config_type_names[] =\ } +/* + * Option name defintion --- common to gtm.conf and gtm_proxy.conf + * + * This will be used both in *.conf and command line option override. + */ +#define GTM_OPTNAME_ACTIVE_HOST "active_host" +#define GTM_OPTNAME_ACTIVE_PORT "active_port" +#define GTM_OPTNAME_CONFIG_FILE "config_file" +#define GTM_OPTNAME_DATA_DIR "data_dir" +#define GTM_OPTNAME_ERR_WAIT_COUNT "err_wait_count" +#define GTM_OPTNAME_ERR_WAIT_INTERVAL "err_wait_interval" +#define GTM_OPTNAME_ERR_WAIT_OPT "err_wait_opt" +#define GTM_OPTNAME_ERROR_REPORTER "error_reporter" +#define GTM_OPTNAME_GTM_HOST "gtm_host" +#define GTM_OPTNAME_GTM_PORT "gtm_port" +#define GTM_OPTNAME_KEEPALIVES_IDLE "keepalives_idle" +#define GTM_OPTNAME_KEEPALIVES_INTERVAL "keepalives_interval" +#define GTM_OPTNAME_KEEPALIVES_COUNT "keepalives_count" +#define GTM_OPTNAME_LISTEN_ADDRESSES "listen_addresses" +#define GTM_OPTNAME_LOG_FILE "log_file" +#define GTM_OPTNAME_LOG_MIN_MESSAGES "log_min_messages" +#define GTM_OPTNAME_NODENAME "nodename" +#define GTM_OPTNAME_PORT "port" +#define GTM_OPTNAME_STARTUP "startup" +#define GTM_OPTNAME_STATUS_READER "status_reader" +#define GTM_OPTNAME_WORKER_THREADS "worker_threads" #endif /* GTM_OPT_H */ diff --git a/src/include/gtm/gtm_standby.h b/src/include/gtm/gtm_standby.h index 97cce3e33a..a33a8b2316 100644 --- a/src/include/gtm/gtm_standby.h +++ b/src/include/gtm/gtm_standby.h @@ -18,6 +18,7 @@ #include "c.h" #include "gtm/gtm_c.h" #include "gtm/libpq-fe.h" +#include "gtm/register.h" /* * Variables to interact with GTM active under GTM standby mode. @@ -42,6 +43,13 @@ void gtm_standby_disconnect_from_standby(GTM_Conn *conn); GTM_Conn *gtm_standby_reconnect_to_standby(GTM_Conn *old_conn, int retry_max); bool gtm_standby_check_communication_error(int *retry_count, GTM_Conn *oldconn); +GTM_PGXCNodeInfo *find_standby_node_info(void); + +int gtm_standby_begin_backup(void); +int gtm_standby_end_backup(void); +void gtm_standby_closeActiveConn(void); + + /* * Startup mode */ diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h index 449feb8b3c..6cd5181a96 100644 --- a/src/include/gtm/gtm_txn.h +++ b/src/include/gtm/gtm_txn.h @@ -106,25 +106,25 @@ typedef enum GTM_TransactionStates typedef struct GTM_TransactionInfo { - GTM_TransactionHandle gti_handle; + GTM_TransactionHandle gti_handle; GTM_ThreadID gti_thread_id; - bool gti_in_use; + bool gti_in_use; GlobalTransactionId gti_gxid; - GTM_TransactionStates gti_state; - char *gti_coordname; + GTM_TransactionStates gti_state; + char *gti_coordname; GlobalTransactionId gti_xmin; GTM_IsolationLevel gti_isolevel; - bool gti_readonly; + bool gti_readonly; GTMProxy_ConnID gti_backend_id; - char *nodestring; /* List of nodes prepared */ - char *gti_gid; + char *nodestring; /* List of nodes prepared */ + char *gti_gid; GTM_SnapshotData gti_current_snapshot; - bool gti_snapshot_set; + bool gti_snapshot_set; - GTM_RWLock gti_lock; - bool gti_vacuum; + GTM_RWLock gti_lock; + bool gti_vacuum; } GTM_TransactionInfo; #define GTM_MAX_2PC_NODES 16 @@ -162,7 +162,7 @@ typedef struct GTM_Transactions int32 gt_lastslot; GTM_TransactionInfo gt_transactions_array[GTM_MAX_GLOBAL_TRANSACTIONS]; - gtm_List *gt_open_transactions; + gtm_List *gt_open_transactions; GTM_RWLock gt_TransArrayLock; } GTM_Transactions; diff --git a/src/include/gtm/register.h b/src/include/gtm/register.h index 5902902e7b..15674ec59a 100644 --- a/src/include/gtm/register.h +++ b/src/include/gtm/register.h @@ -84,4 +84,7 @@ void ProcessPGXCNodeUnregister(Port *myport, StringInfo message); void ProcessPGXCNodeBackendDisconnect(Port *myport, StringInfo message); void ProcessPGXCNodeList(Port *myport, StringInfo message); +void ProcessGTMBeginBackup(Port *myport, StringInfo message); +void ProcessGTMEndBackup(Port *myport, StringInfo message); + #endif /* GTM_NODE_H */ |
