diff options
-rw-r--r-- | doc-xc/src/sgml/ref/gtm.sgmlin | 21 | ||||
-rw-r--r-- | src/gtm/client/fe-protocol.c | 3 | ||||
-rw-r--r-- | src/gtm/client/gtm_client.c | 858 | ||||
-rw-r--r-- | src/gtm/client/pqexpbuffer.c | 2 | ||||
-rw-r--r-- | src/gtm/common/elog.c | 9 | ||||
-rw-r--r-- | src/gtm/common/gtm_utils.c | 181 | ||||
-rw-r--r-- | src/gtm/libpq/pqcomm.c | 6 | ||||
-rw-r--r-- | src/gtm/main/gtm.conf.sample | 1 | ||||
-rw-r--r-- | src/gtm/main/gtm_opt.c | 22 | ||||
-rw-r--r-- | src/gtm/main/gtm_seq.c | 543 | ||||
-rw-r--r-- | src/gtm/main/gtm_snap.c | 5 | ||||
-rw-r--r-- | src/gtm/main/gtm_standby.c | 12 | ||||
-rw-r--r-- | src/gtm/main/gtm_txn.c | 916 | ||||
-rw-r--r-- | src/gtm/main/main.c | 202 | ||||
-rw-r--r-- | src/gtm/proxy/gtm_proxy_opt.c | 14 | ||||
-rw-r--r-- | src/gtm/proxy/proxy_main.c | 54 | ||||
-rw-r--r-- | src/gtm/recovery/register_gtm.c | 206 | ||||
-rw-r--r-- | src/gtm/recovery/replication.c | 8 | ||||
-rw-r--r-- | src/include/gtm/gtm_client.h | 59 | ||||
-rw-r--r-- | src/include/gtm/gtm_msg.h | 73 | ||||
-rw-r--r-- | src/include/gtm/gtm_opt.h | 1 | ||||
-rw-r--r-- | src/include/gtm/gtm_seq.h | 14 | ||||
-rw-r--r-- | src/include/gtm/gtm_txn.h | 31 | ||||
-rw-r--r-- | src/include/gtm/gtm_utils.h | 5 | ||||
-rw-r--r-- | src/include/gtm/register.h | 4 |
25 files changed, 2404 insertions, 846 deletions
diff --git a/doc-xc/src/sgml/ref/gtm.sgmlin b/doc-xc/src/sgml/ref/gtm.sgmlin index c27703ef81..28e16b5975 100644 --- a/doc-xc/src/sgml/ref/gtm.sgmlin +++ b/doc-xc/src/sgml/ref/gtm.sgmlin @@ -259,6 +259,27 @@ PostgreSQL documentation </listitem> </varlistentry> + <varlistentry id="gtm-opt-synchronous-backup" xreflabel="gtm_opt_synchronous_backup"> + <term><varname>synchronous-backup</varname> (<type>boolean</type>)</term> + <indexterm> + <primary><varname>synchronous-backup</varname> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies if backup to GTM-Standby is taken synchronously. If this is turned on, + GTM will send and receive synchronize message to make sure that all the backups + reached to the standby. + </para> + <para> + If it is turned off, all the backup information will be sent without checking they + reached to GTM-Standby. + </para> + <para> + Default value is off. + </para> + </listitem> + </varlistentry> + </variablelist> diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c index 1fff790c53..bbc234ea9d 100644 --- a/src/gtm/client/fe-protocol.c +++ b/src/gtm/client/fe-protocol.c @@ -351,6 +351,9 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) switch (result->gr_type) { + case SYNC_STANDBY_RESULT: + break; + case NODE_BEGIN_REPLICATION_INIT_RESULT: break; diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 2cc5b4dd9c..521df48112 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -14,7 +14,13 @@ */ /* Time in seconds to wait for a response from GTM */ /* We should consider making this a GUC */ +#ifndef CLIENT_GTM_TIMEOUT +#ifdef GTM_DEBUG +#define CLIENT_GTM_TIMEOUT 3600 +#else #define CLIENT_GTM_TIMEOUT 20 +#endif +#endif #include <time.h> @@ -31,10 +37,33 @@ #include "gtm/register.h" #include "gtm/assert.h" +extern bool Backup_synchronously; + void GTM_FreeResult(GTM_Result *result, GTM_PGXCNodeType remote_type); static GTM_Result *makeEmptyResultIfIsNull(GTM_Result *oldres); - +static int commit_prepared_transaction_internal(GTM_Conn *conn, + GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, + bool is_backup); +static int prepare_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup); +static int abort_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup); +static int abort_transaction_multi_internal(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, + int *txn_count_out, int *status_out, bool is_backup); +static int open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, bool cycle, bool is_backup); +static GTM_Sequence get_next_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup); +static int set_val_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled, bool is_backup); +static int reset_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup); +static int commit_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup); +static int close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup); +static int rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup); +static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup); +static int node_register_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, + char *node_name, char *datafolder, GTM_PGXCNodeStatus status, bool is_backup); +static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup); /* * Make an empty result if old one is null. */ @@ -371,6 +400,63 @@ send_failed: /* * Transaction Management API */ + +int +bkup_begin_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, GTM_IsolationLevel isolevel, + bool read_only, GTM_Timestamp timestamp) +{ + /* Start the message. */ + if (gtmpqPutMsgStart('C', true, conn) || + gtmpqPutInt(MSG_BKUP_TXN_BEGIN, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) || + gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) || + gtmpqPutc(read_only, conn) || + gtmpqPutnchar((char *)×tamp, sizeof(GTM_Timestamp), conn)) + goto send_failed; + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; + + return 0; + +send_failed: + return -1; + +} + +int +bkup_begin_transaction_gxid(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid, + GTM_IsolationLevel isolevel, bool read_only, GTM_Timestamp timestamp) +{ + /* Start the message. */ + if (gtmpqPutMsgStart('C', true, conn) || + gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) || + gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) || + gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) || + gtmpqPutc(read_only, conn) || + gtmpqPutnchar((char *)×tamp, sizeof(GTM_Timestamp), conn)) + goto send_failed; + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; + + return 0; + +send_failed: + return -1; +} + GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp) { @@ -418,6 +504,32 @@ send_failed: return InvalidGlobalTransactionId; } + +int +bkup_begin_transaction_autovacuum(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid, + GTM_IsolationLevel isolevel) +{ + /* Start the message. */ + if (gtmpqPutMsgStart('C', true, conn) || + gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) || + gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) || + gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn)) + goto send_failed; + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; + + return 0; + +send_failed: + return -1; +} /* * Transaction Management API * Begin a transaction for an autovacuum worker process @@ -465,14 +577,28 @@ send_failed: } int +bkup_commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid) +{ + return commit_transaction_internal(conn, gxid, true); +} + + +int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid) { + return commit_transaction_internal(conn, gxid, false); +} + + +static int +commit_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_TXN_COMMIT, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_TXN_COMMIT : MSG_TXN_COMMIT, sizeof (GTM_MessageType), conn) || gtmpqPutc(true, conn) || gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; @@ -485,21 +611,25 @@ commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_type == TXN_COMMIT_RESULT); - Assert(res->gr_resdata.grd_gxid == gxid); - } + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_type == TXN_COMMIT_RESULT); + Assert(res->gr_resdata.grd_gxid == gxid); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -511,12 +641,24 @@ send_failed: int commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid) { + return commit_prepared_transaction_internal(conn, gxid, prepared_gxid, false); +} + +int +bkup_commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid) +{ + return commit_prepared_transaction_internal(conn, gxid, prepared_gxid, true); +} + +static int +commit_prepared_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_TXN_COMMIT_PREPARED, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_TXN_COMMIT_PREPARED : MSG_TXN_COMMIT_PREPARED, sizeof (GTM_MessageType), conn) || gtmpqPutc(true, conn) || gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn) || gtmpqPutc(true, conn) || @@ -531,21 +673,25 @@ commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTran if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_type == TXN_COMMIT_PREPARED_RESULT); - Assert(res->gr_resdata.grd_gxid == gxid); - } + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_type == TXN_COMMIT_PREPARED_RESULT); + Assert(res->gr_resdata.grd_gxid == gxid); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; send_failed: receive_failed: @@ -557,12 +703,24 @@ receive_failed: int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid) { + return abort_transaction_internal(conn, gxid, false); +} + +int +bkup_abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid) +{ + return abort_transaction_internal(conn, gxid, true); +} + +static int +abort_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_TXN_ROLLBACK, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_TXN_ROLLBACK : MSG_TXN_ROLLBACK, sizeof (GTM_MessageType), conn) || gtmpqPutc(true, conn) || gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; @@ -575,21 +733,25 @@ abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_type == TXN_ROLLBACK_RESULT); - Assert(res->gr_resdata.grd_gxid == gxid); - } + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_type == TXN_ROLLBACK_RESULT); + Assert(res->gr_resdata.grd_gxid == gxid); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -600,6 +762,31 @@ send_failed: } int +backup_start_prepared_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, char *gid, + char *nodestring) +{ + Assert(nodestring && gid && conn); + + if (gtmpqPutMsgStart('C', true, conn) || + gtmpqPutInt(MSG_BKUP_TXN_START_PREPARED, sizeof(GTM_MessageType), conn) || + gtmpqPutc(false, conn) || + gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) || + gtmpqPutInt(strlen(gid), sizeof(GTM_StrLen), conn) || + gtmpqPutnchar(gid, strlen(gid), conn) || + gtmpqPutInt(strlen(nodestring), sizeof(GTM_StrLen), conn) || + gtmpqPutnchar(nodestring, strlen(nodestring), conn)) + goto send_failed; + + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + return GTM_RESULT_OK; + +send_failed: + return -1; +} + +int start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid, char *nodestring) { @@ -652,16 +839,27 @@ send_failed: return -1; } - int prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid) { + return prepare_transaction_internal(conn, gxid, false); +} + +int +bkup_prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid) +{ + return prepare_transaction_internal(conn, gxid, true); +} + +static int +prepare_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_TXN_PREPARE, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_TXN_PREPARE : MSG_TXN_PREPARE, sizeof (GTM_MessageType), conn) || gtmpqPutc(true, conn) || gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn)) goto send_failed; @@ -674,21 +872,25 @@ prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_type == TXN_PREPARE_RESULT); - Assert(res->gr_resdata.grd_gxid == gxid); - } + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_type == TXN_PREPARE_RESULT); + Assert(res->gr_resdata.grd_gxid == gxid); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -809,12 +1011,28 @@ open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, bool cycle) { + return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, false); +} + +int +bkup_open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, bool cycle) +{ + return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, true); +} + +static int +open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, bool cycle, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_INIT, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_INIT : MSG_SEQUENCE_INIT, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || gtmpqPutnchar((char *)&increment, sizeof (GTM_Sequence), conn) || @@ -824,23 +1042,27 @@ open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, gtmpqPutc(cycle, conn)) goto send_failed; - /* Finish the message. */ - if (gtmpqPutMsgEnd(conn)) - goto send_failed; + if (!is_backup) + { + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; - /* Flush to ensure backend gets it. */ - if (gtmpqFlush(conn)) - goto send_failed; + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -854,12 +1076,28 @@ alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart) { + return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, false); +} + +int +bkup_alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart) +{ + return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, true); +} + +static int +alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_ALTER, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_ALTER : MSG_SEQUENCE_ALTER, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || gtmpqPutnchar((char *)&increment, sizeof (GTM_Sequence), conn) || @@ -879,15 +1117,19 @@ alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -899,12 +1141,24 @@ send_failed: int close_sequence(GTM_Conn *conn, GTM_SequenceKey key) { + return close_sequence_internal(conn, key, false); +} + +int +bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key) +{ + return close_sequence_internal(conn, key, true); +} + +static int +close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_CLOSE : MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || gtmpqPutnchar((char *)&key->gsk_type, sizeof(GTM_SequenceKeyType), conn)) @@ -918,15 +1172,19 @@ close_sequence(GTM_Conn *conn, GTM_SequenceKey key) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -938,12 +1196,24 @@ send_failed: int rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey) { + return rename_sequence_internal(conn, key, newkey, false); +} + +int +bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey) +{ + return rename_sequence_internal(conn, key, newkey, true); +} + +static int +rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_RENAME, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_RENAME : MSG_SEQUENCE_RENAME, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)|| gtmpqPutInt(newkey->gsk_keylen, 4, conn) || @@ -958,15 +1228,19 @@ rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1019,12 +1293,24 @@ send_failed: int set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled) { + return set_val_internal(conn, key, nextval, iscalled, false); +} + +int +bkup_set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled) +{ + return set_val_internal(conn, key, nextval, iscalled, true); +} + +static int +set_val_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_SET_VAL, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_SET_VAL : MSG_SEQUENCE_SET_VAL, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) || gtmpqPutnchar((char *)&nextval, sizeof (GTM_Sequence), conn) || @@ -1039,15 +1325,19 @@ set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1059,12 +1349,24 @@ send_failed: GTM_Sequence get_next(GTM_Conn *conn, GTM_SequenceKey key) { + return get_next_internal(conn, key, false); +} + +GTM_Sequence +bkup_get_next(GTM_Conn *conn, GTM_SequenceKey key) +{ + return get_next_internal(conn, key, true); +} + +static GTM_Sequence +get_next_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_GET_NEXT, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_GET_NEXT : MSG_SEQUENCE_GET_NEXT, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)) goto send_failed; @@ -1077,18 +1379,22 @@ get_next(GTM_Conn *conn, GTM_SequenceKey key) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; - - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; - - if (res->gr_status == GTM_RESULT_OK) - return res->gr_resdata.grd_seq.seqval; - else - return InvalidSequenceValue; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; + + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; + + if (res->gr_status == GTM_RESULT_OK) + return res->gr_resdata.grd_seq.seqval; + else + return InvalidSequenceValue; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1100,12 +1406,24 @@ send_failed: int reset_sequence(GTM_Conn *conn, GTM_SequenceKey key) { + return reset_sequence_internal(conn, key, false); +} + +int +bkup_reset_sequence(GTM_Conn *conn, GTM_SequenceKey key) +{ + return reset_sequence_internal(conn, key, true); +} + +static int +reset_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; /* Start the message. */ if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_SEQUENCE_RESET, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_RESET : MSG_SEQUENCE_RESET, sizeof (GTM_MessageType), conn) || gtmpqPutInt(key->gsk_keylen, 4, conn) || gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)) goto send_failed; @@ -1118,15 +1436,19 @@ reset_sequence(GTM_Conn *conn, GTM_SequenceKey key) if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1177,6 +1499,8 @@ node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc) * This number is modified at proxy level automatically. * * node_register() returns 0 on success, -1 on failure. + * + * is_backup indicates the message should be *_BKUP_* message */ int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, @@ -1193,7 +1517,7 @@ int node_register(GTM_Conn *conn, return -1; } - return node_register_internal(conn, type, host, port, node_name, datafolder, NODE_CONNECTED); + return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, false); } int node_register_internal(GTM_Conn *conn, @@ -1204,6 +1528,47 @@ int node_register_internal(GTM_Conn *conn, char *datafolder, GTM_PGXCNodeStatus status) { + return node_register_worker(conn, type, host, port, node_name, datafolder, status, false); +} + +int bkup_node_register(GTM_Conn *conn, + GTM_PGXCNodeType type, + GTM_PGXCNodePort port, + char *node_name, + char *datafolder) +{ + char host[1024]; + int rc; + + node_get_local_addr(conn, host, sizeof(host), &rc); + if (rc != 0) + { + return -1; + } + + return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, true); +} + +int bkup_node_register_internal(GTM_Conn *conn, + GTM_PGXCNodeType type, + const char *host, + GTM_PGXCNodePort port, + char *node_name, + char *datafolder, + GTM_PGXCNodeStatus status) +{ + return node_register_worker(conn, type, host, port, node_name, datafolder, status, true); +} + +static int node_register_worker(GTM_Conn *conn, + GTM_PGXCNodeType type, + const char *host, + GTM_PGXCNodePort port, + char *node_name, + char *datafolder, + GTM_PGXCNodeStatus status, + bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; char proxy_name[] = ""; @@ -1218,7 +1583,7 @@ int node_register_internal(GTM_Conn *conn, */ if (gtmpqPutMsgStart('C', true, conn) || /* Message Type */ - gtmpqPutInt(MSG_NODE_REGISTER, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup? MSG_BKUP_NODE_REGISTER : MSG_NODE_REGISTER, sizeof (GTM_MessageType), conn) || /* Node Type to Register */ gtmpqPutnchar((char *)&type, sizeof(GTM_PGXCNodeType), conn) || /* Node name length */ @@ -1258,26 +1623,30 @@ int node_register_internal(GTM_Conn *conn, goto send_failed; } - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) + if (!is_backup) { - goto receive_failed; - } + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + { + goto receive_failed; + } - if ((res = GTMPQgetResult(conn)) == NULL) - { - goto receive_failed; - } + if ((res = GTMPQgetResult(conn)) == NULL) + { + goto receive_failed; + } - /* Check on node type and node name */ - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_resdata.grd_node.type == type); - Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0)); - } + /* Check on node type and node name */ + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_resdata.grd_node.type == type); + Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0)); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1288,11 +1657,21 @@ send_failed: int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name) { + return node_unregister_worker(conn, type, node_name, false); +} + +int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name) +{ + return node_unregister_worker(conn, type, node_name, true); +} + +static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup) +{ GTM_Result *res = NULL; time_t finish_time; if (gtmpqPutMsgStart('C', true, conn) || - gtmpqPutInt(MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(is_backup ? MSG_BKUP_NODE_UNREGISTER : MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), conn) || gtmpqPutnchar((char *)&type, sizeof(GTM_PGXCNodeType), conn) || /* Node name length */ gtmpqPutInt(strlen(node_name), sizeof (GTM_StrLen), conn) || @@ -1308,22 +1687,26 @@ int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_nam if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - /* Check on node type and node name */ - if (res->gr_status == GTM_RESULT_OK) - { - Assert(res->gr_resdata.grd_node.type == type); - Assert( (strcmp(res->gr_resdata.grd_node.node_name, node_name) == 0) ); - } + /* Check on node type and node name */ + if (res->gr_status == GTM_RESULT_OK) + { + Assert(res->gr_resdata.grd_node.type == type); + Assert( (strcmp(res->gr_resdata.grd_node.node_name, node_name) == 0) ); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1332,6 +1715,7 @@ send_failed: return -1; } + void GTM_FreeResult(GTM_Result *result, GTM_PGXCNodeType remote_type) { @@ -1432,6 +1816,85 @@ send_failed: return -1; } + +int +bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count, + GTM_TransactionHandle *txn, GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel, + bool *read_only, GTMProxy_ConnID *txn_connid) +{ + int ii; + GlobalTransactionId gxid = start_gxid; + + /* Start the message. */ + if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */ + goto send_failed; + + if (gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(txn_count, sizeof(int), conn)) + goto send_failed; + + for (ii = 0; ii < txn_count; ii++, gxid++) + { + if (gxid == InvalidGlobalTransactionId) + gxid = FirstNormalGlobalTransactionId; + if (gtmpqPutInt(txn[ii], sizeof(GTM_TransactionHandle), conn) || + gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) || + gtmpqPutInt(isolevel[ii], sizeof(int), conn) || + gtmpqPutc(read_only[ii], conn) || + gtmpqPutInt(txn_connid[ii], sizeof(int), conn)) + goto send_failed; + } + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; + + return 0; + +send_failed: + return -1; + +} + +int +bkup_commit_transaction_multi(GTM_Conn *conn, int txn_count, GTM_TransactionHandle *txn) +{ + int ii; + + if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */ + goto send_failed; + + if (gtmpqPutInt(MSG_BKUP_TXN_COMMIT_MULTI, sizeof (GTM_MessageType), conn) || + gtmpqPutInt(txn_count, sizeof(int), conn)) + goto send_failed; + + for (ii = 0; ii < txn_count; ii++) + { + if (gtmpqPutc(false, conn) || + gtmpqPutnchar((char *)&txn[ii], + sizeof (GTM_TransactionHandle), conn)) + goto send_failed; + } + + /* Finish the message. */ + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + /* Flush to ensure backend gets it. */ + if (gtmpqFlush(conn)) + goto send_failed; + + return GTM_RESULT_OK; + +send_failed: + return -1; +} + + int commit_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, int *txn_count_out, int *status_out) @@ -1489,7 +1952,23 @@ send_failed: int abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, - int *txn_count_out, int *status_out) + int *txn_count_out, int *status_out) +{ + return abort_transaction_multi_internal(conn, txn_count, gxid, txn_count_out, status_out, false); +} + +int +bkup_abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid) +{ + int txn_count_out; + int status_out[GTM_MAX_GLOBAL_TRANSACTIONS]; + + return abort_transaction_multi_internal(conn, txn_count, gxid, &txn_count_out, status_out, true); +} + +static int +abort_transaction_multi_internal(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, + int *txn_count_out, int *status_out, bool is_backup) { GTM_Result *res = NULL; time_t finish_time; @@ -1499,7 +1978,7 @@ abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */ goto send_failed; - if (gtmpqPutInt(MSG_TXN_ROLLBACK_MULTI, sizeof (GTM_MessageType), conn) || + if (gtmpqPutInt(is_backup ? MSG_BKUP_TXN_ROLLBACK_MULTI : MSG_TXN_ROLLBACK_MULTI, sizeof (GTM_MessageType), conn) || gtmpqPutInt(txn_count, sizeof(int), conn)) goto send_failed; @@ -1519,21 +1998,25 @@ abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid if (gtmpqFlush(conn)) goto send_failed; - finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; - if (gtmpqWaitTimed(true, false, conn, finish_time) || - gtmpqReadData(conn) < 0) - goto receive_failed; + if (!is_backup) + { + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; - if ((res = GTMPQgetResult(conn)) == NULL) - goto receive_failed; + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; - if (res->gr_status == GTM_RESULT_OK) - { - memcpy(txn_count_out, &res->gr_resdata.grd_txn_get_multi.txn_count, sizeof(int)); - memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out)); - } + if (res->gr_status == GTM_RESULT_OK) + { + memcpy(txn_count_out, &res->gr_resdata.grd_txn_get_multi.txn_count, sizeof(int)); + memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out)); + } - return res->gr_status; + return res->gr_status; + } + return GTM_RESULT_OK; receive_failed: send_failed: @@ -1638,4 +2121,43 @@ send_failed: conn->result->gr_status = GTM_RESULT_COMM_ERROR; return -1; } - + +/* + * Sync with standby + */ +int +gtm_sync_standby(GTM_Conn *conn) +{ + GTM_Result *res = NULL; + time_t finish_time; + + elog(DEBUG3, "Synchronizing with standby"); + + if (gtmpqPutMsgStart('C', true, conn)) + goto send_failed; + + if (gtmpqPutInt(MSG_SYNC_STANDBY, sizeof(GTM_MessageType), conn)) + goto send_failed; + + if (gtmpqPutMsgEnd(conn)) + goto send_failed; + + if (gtmpqFlush(conn)) + goto send_failed; + + finish_time = time(NULL) + CLIENT_GTM_TIMEOUT; + if (gtmpqWaitTimed(true, false, conn, finish_time) || + gtmpqReadData(conn) < 0) + goto receive_failed; + + if ((res = GTMPQgetResult(conn)) == NULL) + goto receive_failed; + + return res->gr_status; + +receive_failed: +send_failed: + conn->result = makeEmptyResultIfIsNull(conn->result); + conn->result->gr_status = GTM_RESULT_COMM_ERROR; + return -1; +} diff --git a/src/gtm/client/pqexpbuffer.c b/src/gtm/client/pqexpbuffer.c index 2b852d6580..856b765509 100644 --- a/src/gtm/client/pqexpbuffer.c +++ b/src/gtm/client/pqexpbuffer.c @@ -139,7 +139,7 @@ resetGTMPQExpBuffer(PQExpBuffer str) { if (str) { - if (str->data != oom_buffer) + if (str->data && str->data != oom_buffer) { str->len = 0; str->data[0] = '\0'; diff --git a/src/gtm/common/elog.c b/src/gtm/common/elog.c index 9098b0ade1..db47481d1a 100644 --- a/src/gtm/common/elog.c +++ b/src/gtm/common/elog.c @@ -853,6 +853,15 @@ send_message_to_server_log(ErrorData *edata) /* * Write error report to client + * + * At present, this function is not used within GTM. Because this flushes + * message back to the client, GTM should consider to flush backup to the + * standby. However, we cannot simply refer to isGTM because this module + * can be included in coordinator backends. If this can really be called + * from any GTM module, we need a solution to determine that the Port is + * in GTM or not, without direct reference to isGTM. + * + * K.Suzuki, Jan, 2012 */ static void send_message_to_frontend(Port *myport, ErrorData *edata) diff --git a/src/gtm/common/gtm_utils.c b/src/gtm/common/gtm_utils.c index bce9bcb8a2..b1293c2770 100644 --- a/src/gtm/common/gtm_utils.c +++ b/src/gtm/common/gtm_utils.c @@ -16,6 +16,186 @@ #include "gtm/gtm_utils.h" #include "gtm/elog.h" #include "gtm/gtm.h" +#include "gtm/gtm_msg.h" + +struct enum_name +{ + int type; + char *name; +}; + +/* + * Advise: + * Following table can be formatted using gtm_msg.h definitions. + */ +static struct enum_name message_name_tab[] = +{ + {MSG_TYPE_INVALID, "MSG_TYPE_INVALID"}, + {MSG_SYNC_STANDBY, "MSG_SYNC_STANDBY"}, + {MSG_NODE_REGISTER, "MSG_NODE_REGISTER"}, + {MSG_BKUP_NODE_REGISTER, "MSG_BKUP_NODE_REGISTER"}, + {MSG_NODE_UNREGISTER, "MSG_NODE_UNREGISTER"}, + {MSG_BKUP_NODE_UNREGISTER, "MSG_BKUP_NODE_UNREGISTER"}, + {MSG_NODE_LIST, "MSG_NODE_LIST"}, + {MSG_NODE_BEGIN_REPLICATION_INIT, "MSG_NODE_BEGIN_REPLICATION_INIT"}, + {MSG_NODE_END_REPLICATION_INIT, "MSG_NODE_END_REPLICATION_INIT"}, + {MSG_BEGIN_BACKUP, "MSG_BEGIN_BACKUP"}, + {MSG_END_BACKUP, "MSG_END_BACKUP"}, + {MSG_TXN_BEGIN, "MSG_TXN_BEGIN"}, + {MSG_BKUP_TXN_BEGIN, "MSG_BKUP_TXN_BEGIN"}, + {MSG_TXN_BEGIN_GETGXID, "MSG_TXN_BEGIN_GETGXID"}, + {MSG_BKUP_TXN_BEGIN_GETGXID, "MSG_BKUP_TXN_BEGIN_GETGXID"}, + {MSG_TXN_BEGIN_GETGXID_MULTI, "MSG_TXN_BEGIN_GETGXID_MULTI"}, + {MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, "MSG_BKUP_TXN_BEGIN_GETGXID_MULTI"}, + {MSG_TXN_START_PREPARED, "MSG_TXN_START_PREPARED"}, + {MSG_BKUP_TXN_START_PREPARED, "MSG_BKUP_TXN_START_PREPARED"}, + {MSG_TXN_COMMIT, "MSG_TXN_COMMIT"}, + {MSG_BKUP_TXN_COMMIT, "MSG_BKUP_TXN_COMMIT"}, + {MSG_TXN_COMMIT_MULTI, "MSG_TXN_COMMIT_MULTI"}, + {MSG_BKUP_TXN_COMMIT_MULTI, "MSG_BKUP_TXN_COMMIT_MULTI"}, + {MSG_TXN_COMMIT_PREPARED, "MSG_TXN_COMMIT_PREPARED"}, + {MSG_BKUP_TXN_COMMIT_PREPARED, "MSG_BKUP_TXN_COMMIT_PREPARED"}, + {MSG_TXN_PREPARE, "MSG_TXN_PREPARE"}, + {MSG_BKUP_TXN_PREPARE, "MSG_BKUP_TXN_PREPARE"}, + {MSG_TXN_ROLLBACK, "MSG_TXN_ROLLBACK"}, + {MSG_BKUP_TXN_ROLLBACK, "MSG_BKUP_TXN_ROLLBACK"}, + {MSG_TXN_ROLLBACK_MULTI, "MSG_TXN_ROLLBACK_MULTI"}, + {MSG_BKUP_TXN_ROLLBACK_MULTI, "MSG_BKUP_TXN_ROLLBACK_MULTI"}, + {MSG_TXN_GET_GID_DATA, "MSG_TXN_GET_GID_DATA"}, + {MSG_TXN_GET_GXID, "MSG_TXN_GET_GXID"}, + {MSG_BKUP_TXN_GET_GXID, "MSG_BKUP_TXN_GET_GXID"}, + {MSG_TXN_GET_NEXT_GXID, "MSG_TXN_GET_NEXT_GXID"}, + {MSG_TXN_GXID_LIST, "MSG_TXN_GXID_LIST"}, + {MSG_SNAPSHOT_GET, "MSG_SNAPSHOT_GET"}, + {MSG_SNAPSHOT_GET_MULTI, "MSG_SNAPSHOT_GET_MULTI"}, + {MSG_SNAPSHOT_GXID_GET, "MSG_SNAPSHOT_GXID_GET"}, + {MSG_SEQUENCE_INIT, "MSG_SEQUENCE_INIT"}, + {MSG_BKUP_SEQUENCE_INIT, "MSG_BKUP_SEQUENCE_INIT"}, + {MSG_SEQUENCE_GET_CURRENT, "MSG_SEQUENCE_GET_CURRENT"}, + {MSG_SEQUENCE_GET_NEXT, "MSG_SEQUENCE_GET_NEXT"}, + {MSG_BKUP_SEQUENCE_GET_NEXT, "MSG_BKUP_SEQUENCE_GET_NEXT"}, + {MSG_SEQUENCE_GET_LAST, "MSG_SEQUENCE_GET_LAST"}, + {MSG_SEQUENCE_SET_VAL, "MSG_SEQUENCE_SET_VAL"}, + {MSG_BKUP_SEQUENCE_SET_VAL, "MSG_BKUP_SEQUENCE_SET_VAL"}, + {MSG_SEQUENCE_RESET, "MSG_SEQUENCE_RESET"}, + {MSG_BKUP_SEQUENCE_RESET, "MSG_BKUP_SEQUENCE_RESET"}, + {MSG_SEQUENCE_CLOSE, "MSG_SEQUENCE_CLOSE"}, + {MSG_BKUP_SEQUENCE_CLOSE, "MSG_BKUP_SEQUENCE_CLOSE"}, + {MSG_SEQUENCE_RENAME, "MSG_SEQUENCE_RENAME"}, + {MSG_BKUP_SEQUENCE_RENAME, "MSG_BKUP_SEQUENCE_RENAME"}, + {MSG_BKUP_SEQUENCE_RENAME, "MSG_BKUP_SEQUENCE_RENAME"}, + {MSG_SEQUENCE_ALTER, "MSG_SEQUENCE_ALTER"}, + {MSG_BKUP_SEQUENCE_ALTER, "MSG_BKUP_SEQUENCE_ALTER"}, + {MSG_SEQUENCE_LIST, "MSG_SEQUENCE_LIST"}, + {MSG_TXN_GET_STATUS, "MSG_TXN_GET_STATUS"}, + {MSG_TXN_GET_ALL_PREPARED, "MSG_TXN_GET_ALL_PREPARED"}, + {MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, "MSG_TXN_BEGIN_GETGXID_AUTOVACUUM"}, + {MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, "MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM"}, + {MSG_DATA_FLUSH, "MSG_DATA_FLUSH"}, + {MSG_BACKEND_DISCONNECT, "MSG_BACKEND_DISCONNECT"}, + {MSG_TYPE_COUNT, "MSG_TYPE_COUNT"}, + {-1, NULL} +}; + +static struct enum_name result_name_tab[] = +{ + {SYNC_STANDBY_RESULT, "SYNC_STANDBY_RESULT"}, + {NODE_REGISTER_RESULT, "NODE_REGISTER_RESULT"}, + {NODE_UNREGISTER_RESULT, "NODE_UNREGISTER_RESULT"}, + {NODE_LIST_RESULT, "NODE_LIST_RESULT"}, + {NODE_BEGIN_REPLICATION_INIT_RESULT, "NODE_BEGIN_REPLICATION_INIT_RESULT"}, + {NODE_END_REPLICATION_INIT_RESULT, "NODE_END_REPLICATION_INIT_RESULT"}, + {BEGIN_BACKUP_RESULT, "BEGIN_BACKUP_RESULT"}, + {END_BACKUP_RESULT, "END_BACKUP_RESULT"}, + {TXN_BEGIN_RESULT, "TXN_BEGIN_RESULT"}, + {TXN_BEGIN_GETGXID_RESULT, "TXN_BEGIN_GETGXID_RESULT"}, + {TXN_BEGIN_GETGXID_MULTI_RESULT, "TXN_BEGIN_GETGXID_MULTI_RESULT"}, + {TXN_PREPARE_RESULT, "TXN_PREPARE_RESULT"}, + {TXN_START_PREPARED_RESULT, "TXN_START_PREPARED_RESULT"}, + {TXN_COMMIT_PREPARED_RESULT, "TXN_COMMIT_PREPARED_RESULT"}, + {TXN_COMMIT_RESULT, "TXN_COMMIT_RESULT"}, + {TXN_COMMIT_MULTI_RESULT, "TXN_COMMIT_MULTI_RESULT"}, + {TXN_ROLLBACK_RESULT, "TXN_ROLLBACK_RESULT"}, + {TXN_ROLLBACK_MULTI_RESULT, "TXN_ROLLBACK_MULTI_RESULT"}, + {TXN_GET_GID_DATA_RESULT, "TXN_GET_GID_DATA_RESULT"}, + {TXN_GET_GXID_RESULT, "TXN_GET_GXID_RESULT"}, + {TXN_GET_NEXT_GXID_RESULT, "TXN_GET_NEXT_GXID_RESULT"}, + {TXN_GXID_LIST_RESULT, "TXN_GXID_LIST_RESULT"}, + {SNAPSHOT_GET_RESULT, "SNAPSHOT_GET_RESULT"}, + {SNAPSHOT_GET_MULTI_RESULT, "SNAPSHOT_GET_MULTI_RESULT"}, + {SNAPSHOT_GXID_GET_RESULT, "SNAPSHOT_GXID_GET_RESULT"}, + {SEQUENCE_INIT_RESULT, "SEQUENCE_INIT_RESULT"}, + {SEQUENCE_GET_CURRENT_RESULT, "SEQUENCE_GET_CURRENT_RESULT"}, + {SEQUENCE_GET_NEXT_RESULT, "SEQUENCE_GET_NEXT_RESULT"}, + {SEQUENCE_GET_LAST_RESULT, "SEQUENCE_GET_LAST_RESULT"}, + {SEQUENCE_SET_VAL_RESULT, "SEQUENCE_SET_VAL_RESULT"}, + {SEQUENCE_RESET_RESULT, "SEQUENCE_RESET_RESULT"}, + {SEQUENCE_CLOSE_RESULT, "SEQUENCE_CLOSE_RESULT"}, + {SEQUENCE_RENAME_RESULT, "SEQUENCE_RENAME_RESULT"}, + {SEQUENCE_ALTER_RESULT, "SEQUENCE_ALTER_RESULT"}, + {SEQUENCE_LIST_RESULT, "SEQUENCE_LIST_RESULT"}, + {TXN_GET_STATUS_RESULT, "TXN_GET_STATUS_RESULT"}, + {TXN_GET_ALL_PREPARED_RESULT, "TXN_GET_ALL_PREPARED_RESULT"}, + {TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, "TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT"}, + {RESULT_TYPE_COUNT, "RESULT_TYPE_COUNT"}, + {-1, NULL} +}; + +static char **message_name = NULL; +static int message_max; +static char **result_name = NULL; +static int result_max; + +void gtm_util_init_nametabs(void) +{ + int ii; + + if (message_name) + free(message_name); + if (result_name) + free(result_name); + for (ii = 0, message_max = 0; message_name_tab[ii].type >= 0; ii++) + { + if (message_max < message_name_tab[ii].type) + message_max = message_name_tab[ii].type; + } + message_name = (char **)malloc(sizeof(char *) * (message_max + 1)); + memset(message_name, sizeof(char *) * (message_max + 1), 0); + for (ii = 0; message_name_tab[ii].type >= 0; ii++) + { + message_name[message_name_tab[ii].type] = message_name_tab[ii].name; + } + + for (ii = 0, result_max = 0; result_name_tab[ii].type >= 0; ii++) + { + if (result_max < result_name_tab[ii].type) + result_max = result_name_tab[ii].type; + } + result_name = (char **)malloc(sizeof(char *) * (result_max + 1)); + memset(result_name, sizeof(char *) * (result_max + 1), 0); + for (ii = 0; result_name_tab[ii].type >= 0; ii++) + { + result_name[result_name_tab[ii].type] = result_name_tab[ii].name; + } +} + +char *gtm_util_message_name(GTM_MessageType type) +{ + if (message_name == NULL) + gtm_util_init_nametabs(); + if (type > message_max) + return "UNKNOWN_MESSAGE"; + return message_name[type]; +} + +char *gtm_util_result_name(GTM_ResultType type) +{ + if (result_name == NULL) + gtm_util_init_nametabs(); + if (type > result_max) + return "UNKNOWN_RESULT"; + return result_name[type]; +} /* * gtm_report_failure() is an utility function to report fatal failure @@ -37,3 +217,4 @@ gtm_report_failure(GTM_Conn *failed_conn) return; } #endif + diff --git a/src/gtm/libpq/pqcomm.c b/src/gtm/libpq/pqcomm.c index d2a6e9b6b5..048434ded4 100644 --- a/src/gtm/libpq/pqcomm.c +++ b/src/gtm/libpq/pqcomm.c @@ -97,9 +97,9 @@ /* Where the Unix socket file is */ static char sock_path[MAXGTMPATH]; -static int tcp_keepalives_idle; -static int tcp_keepalives_interval; -static int tcp_keepalives_count; +extern int tcp_keepalives_idle; +extern int tcp_keepalives_interval; +extern int tcp_keepalives_count; /* diff --git a/src/gtm/main/gtm.conf.sample b/src/gtm/main/gtm.conf.sample index d0e3fdb629..47a0ce443f 100644 --- a/src/gtm/main/gtm.conf.sample +++ b/src/gtm/main/gtm.conf.sample @@ -56,3 +56,4 @@ # Valid value: DEBUG, DEBUG5, DEBUG4, DEBUG3, # DEBUG2, DEBUG1, INFO, NOTICE, WARNING, # ERROR, LOG, FATAL, PANIC +#synchronous_backup = off # If backup to standby is synchronous diff --git a/src/gtm/main/gtm_opt.c b/src/gtm/main/gtm_opt.c index c84dbe661f..9548081ba9 100644 --- a/src/gtm/main/gtm_opt.c +++ b/src/gtm/main/gtm_opt.c @@ -41,6 +41,7 @@ const char *config_filename = CONFIG_FILENAME; extern char *NodeName; extern char *ListenAddresses; +extern bool Backup_synchronously; extern int GTMPortNumber; extern char *active_addr; extern int active_port; @@ -48,9 +49,9 @@ extern int GTM_StandbyMode; extern char *error_reporter; extern char *status_reader; extern int log_min_messages; -extern int keepalives_idle; -extern int keepalives_count; -extern int keepalives_interval; +extern int tcp_keepalives_idle; +extern int tcp_keepalives_count; +extern int tcp_keepalives_interval; extern char *GTMDataDir; @@ -122,6 +123,15 @@ Config_Type_Names(); struct config_bool ConfigureNamesBool[] = { + { + {GTM_OPTNAME_SYNCHRONOUS_BACKUP, GTMC_STARTUP, + gettext_noop("Specifies if backup to GTM-Standby is taken in synchronous manner."), + gettext_noop("Default value is off."), + 0 + }, + &Backup_synchronously, + false, false, NULL + }, /* End-of-list marker */ { {NULL, 0, NULL, NULL, 0}, NULL, false, false, NULL @@ -157,7 +167,7 @@ struct config_int ConfigureNamesInt[] = gettext_noop("This option is effective only when it runs as GTM-Standby."), GTMOPT_UNIT_TIME }, - &keepalives_idle, + &tcp_keepalives_idle, 0, 0, INT_MAX, 0, NULL }, @@ -167,7 +177,7 @@ struct config_int ConfigureNamesInt[] = gettext_noop("This option is effective only when it runs as GTM-Standby."), GTMOPT_UNIT_TIME }, - &keepalives_interval, + &tcp_keepalives_interval, 0, 0, INT_MAX, 0, NULL }, @@ -177,7 +187,7 @@ struct config_int ConfigureNamesInt[] = gettext_noop("This option is effective only when it runs as GTM-Standby."), 0 }, - &keepalives_count, + &tcp_keepalives_count, 0, 0, INT_MAX, 0, NULL }, diff --git a/src/gtm/main/gtm_seq.c b/src/gtm/main/gtm_seq.c index 05beef11e7..5f154c6949 100644 --- a/src/gtm/main/gtm_seq.c +++ b/src/gtm/main/gtm_seq.c @@ -27,6 +27,8 @@ #include "gtm/libpq-int.h" #include "gtm/pqformat.h" +extern bool Backup_synchronously; + typedef struct GTM_SeqInfoHashBucket { gtm_List *shb_list; @@ -831,10 +833,12 @@ GTM_InitSeqManager(void) } /* - * Process MSG_SEQUENCE_INIT message + * Process MSG_SEQUENCE_INIT/MSG_BKUP_SEQUENCE_INIT message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_INIT */ void -ProcessSequenceInitCommand(Port *myport, StringInfo message) +ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; GTM_Sequence increment, minval, maxval, startval; @@ -882,55 +886,69 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message) pq_getmsgend(message); - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_INIT_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) + if (!is_backup) { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - - elog(LOG, "calling open_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - - retry: - rc = open_sequence(GetMyThreadInfo->thr_conn->standby, - &seqkey, - increment, - minval, - maxval, - startval, - cycle); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + + elog(LOG, "calling open_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + + retry: + rc = bkup_open_sequence(GetMyThreadInfo->thr_conn->standby, + &seqkey, + increment, + minval, + maxval, + startval, + cycle); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - elog(LOG, "open_sequence() returns rc %d.", rc); - } + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + elog(LOG, "open_sequence() returns rc %d.", rc); + } + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_INIT_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } /* FIXME: need to check errors */ } + /* - * Process MSG_SEQUENCE_ALTER message + * Process MSG_SEQUENCE_ALTER/MSG_BKUP_SEQUENCE_ALTER message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_ALTER */ void -ProcessSequenceAlterCommand(Port *myport, StringInfo message) +ProcessSequenceAlterCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; GTM_Sequence increment, minval, maxval, startval, lastval; @@ -980,47 +998,58 @@ ProcessSequenceAlterCommand(Port *myport, StringInfo message) pq_getmsgend(message); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_ALTER_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if ( GetMyThreadInfo->thr_conn->standby ) + if (!is_backup) { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + /* Backup first */ + if ( GetMyThreadInfo->thr_conn->standby ) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - elog(LOG, "calling alter_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + elog(LOG, "calling alter_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - retry: - rc = alter_sequence(GetMyThreadInfo->thr_conn->standby, - &seqkey, - increment, - minval, - maxval, - startval, - lastval, - cycle, - is_restart); + retry: + rc = bkup_alter_sequence(GetMyThreadInfo->thr_conn->standby, + &seqkey, + increment, + minval, + maxval, + startval, + lastval, + cycle, + is_restart); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + + elog(LOG, "alter_sequence() returns rc %d.", rc); + } + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_ALTER_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_endmessage(myport, &buf); - elog(LOG, "alter_sequence() returns rc %d.", rc); - } + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } - /* FIXME: need to check errors */ + /* FIXME: need to check errors */ + } } @@ -1113,6 +1142,7 @@ ProcessSequenceListCommand(Port *myport, StringInfo message) elog(LOG, "ProcessSequenceListCommand() done."); if (myport->remote_type != GTM_NODE_GTM_PROXY) + /* Don't flush to the backup because this does not change the internal status */ pq_flush(myport); } @@ -1152,8 +1182,14 @@ ProcessSequenceGetCurrentCommand(Port *myport, StringInfo message) pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) + /* Don't flush to the standby because this does not change the status */ pq_flush(myport); + /* + * I don't think backup is needed here. It does not change internal state. + * 27th Dec., 2011, K.Suzuki + */ +#if 0 if (GetMyThreadInfo->thr_conn->standby) { GTM_Sequence loc_seq; @@ -1170,15 +1206,18 @@ retry: elog(LOG, "get_current() returns GTM_Sequence %ld.", loc_seq); } +#endif /* FIXME: need to check errors */ } /* - * Process MSG_SEQUENCE_GET_NEXT message + * Process MSG_SEQUENCE_GET_NEXT/MSG_BKUP_SEQUENCE_GET_NEXT message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_GET_NEXT */ void -ProcessSequenceGetNextCommand(Port *myport, StringInfo message) +ProcessSequenceGetNextCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; StringInfoData buf; @@ -1195,46 +1234,62 @@ ProcessSequenceGetNextCommand(Port *myport, StringInfo message) elog(LOG, "Getting next value %ld for sequence %s", seqval, seqkey.gsk_key); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_GET_NEXT_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_sendbytes(&buf, (char *)&seqval, sizeof (GTM_Sequence)); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + GTM_Sequence loc_seq; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling get_next() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - GTM_Sequence loc_seq; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + loc_seq = bkup_get_next(GetMyThreadInfo->thr_conn->standby, &seqkey); + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - elog(LOG, "calling get_next() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + /* Sync */ + if (Backup_synchronously &&(myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - retry: - loc_seq = get_next(GetMyThreadInfo->thr_conn->standby, &seqkey); - - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "get_next() returns GTM_Sequence %ld.", loc_seq); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_GET_NEXT_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_sendbytes(&buf, (char *)&seqval, sizeof (GTM_Sequence)); + pq_endmessage(myport, &buf); - elog(LOG, "get_next() returns GTM_Sequence %ld.", loc_seq); + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush to the standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + + /* FIXME: need to check errors */ } - /* FIXME: need to check errors */ } /* - * Process MSG_SEQUENCE_SET_VAL message + * Process MSG_SEQUENCE_SET_VAL/MSG_BKUP_SEQUENCE_SET_VAL message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_SET_VAL */ void -ProcessSequenceSetValCommand(Port *myport, StringInfo message) +ProcessSequenceSetValCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; GTM_Sequence nextval; @@ -1273,48 +1328,64 @@ ProcessSequenceSetValCommand(Port *myport, StringInfo message) pq_getmsgend(message); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_SET_VAL_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) - { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - elog(LOG, "calling set_val() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + elog(LOG, "calling set_val() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); -retry: - rc = set_val(GetMyThreadInfo->thr_conn->standby, - &seqkey, - nextval, - iscalled); + retry: + rc = bkup_set_val(GetMyThreadInfo->thr_conn->standby, + &seqkey, + nextval, + iscalled); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - elog(LOG, "set_val() returns rc %d.", rc); - } + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + + elog(LOG, "set_val() returns rc %d.", rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_SET_VAL_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + + } /* FIXME: need to check errors */ } /* - * Process MSG_SEQUENCE_RESET message + * Process MSG_SEQUENCE_RESET/MSG_BKUP_SEQUENCE_RESET message + * + * is_backup indicates the cmessage is MSG_BKUP_SEQUENCE_RESULT */ void -ProcessSequenceResetCommand(Port *myport, StringInfo message) +ProcessSequenceResetCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; StringInfoData buf; @@ -1330,45 +1401,61 @@ ProcessSequenceResetCommand(Port *myport, StringInfo message) (errcode, errmsg("Can not reset the sequence"))); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_RESET_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling reset_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + + retry: + rc = bkup_reset_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey); - if (GetMyThreadInfo->thr_conn->standby) - { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - elog(LOG, "calling reset_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - -retry: - rc = reset_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "reset_sequence() returns rc %d.", rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_RESET_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } - elog(LOG, "reset_sequence() returns rc %d.", rc); } /* FIXME: need to check errors */ } /* - * Process MSG_SEQUENCE_CLOSE message + * Process MSG_SEQUENCE_CLOSE/MSG_BKUP_SEQUENCE_CLOSE message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_CLOSE */ void -ProcessSequenceCloseCommand(Port *myport, StringInfo message) +ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey; StringInfoData buf; @@ -1386,45 +1473,61 @@ ProcessSequenceCloseCommand(Port *myport, StringInfo message) (errcode, errmsg("Can not close the sequence"))); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_CLOSE_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, seqkey.gsk_keylen, 4); - pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling close_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + rc = bkup_close_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey); - elog(LOG, "calling close_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; -retry: - rc = close_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "close_sequence() returns rc %d.", rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_CLOSE_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, seqkey.gsk_keylen, 4); + pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } - elog(LOG, "close_sequence() returns rc %d.", rc); + /* FIXME: need to check errors */ } - /* FIXME: need to check errors */ } /* - * Process MSG_SEQUENCE_RENAME message + * Process MSG_SEQUENCE_RENAME/MSG_BKUP_SEQUENCE_RENAME message + * + * is_backup indicates the message is MSG_BKUP_SEQUENCE_RENAME */ void -ProcessSequenceRenameCommand(Port *myport, StringInfo message) +ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup) { GTM_SequenceKeyData seqkey, newseqkey; StringInfoData buf; @@ -1457,39 +1560,51 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message) pq_getmsgend(message); - /* Send a SUCCESS message back to the client */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, SEQUENCE_RENAME_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendint(&buf, newseqkey.gsk_keylen, 4); - pq_sendbytes(&buf, newseqkey.gsk_key, newseqkey.gsk_keylen); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling rename_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + rc = bkup_rename_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey, &newseqkey); - elog(LOG, "calling rename_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; -retry: - rc = rename_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey, &newseqkey); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "rename_sequence() returns rc %d.", rc); + } + /* Send a SUCCESS message back to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SEQUENCE_RENAME_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendint(&buf, newseqkey.gsk_keylen, 4); + pq_sendbytes(&buf, newseqkey.gsk_key, newseqkey.gsk_keylen); + pq_endmessage(myport, &buf); - elog(LOG, "rename_sequence() returns rc %d.", rc); - } + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } /* FIXME: need to check errors */ } diff --git a/src/gtm/main/gtm_snap.c b/src/gtm/main/gtm_snap.c index 5d94479933..c5db23d645 100644 --- a/src/gtm/main/gtm_snap.c +++ b/src/gtm/main/gtm_snap.c @@ -443,6 +443,10 @@ ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); +#if 0 + /* Do not need this portion because this command does not change + * internal status. + */ if (GetMyThreadInfo->thr_conn->standby) { int _rc; @@ -468,6 +472,7 @@ retry: elog(LOG, "snapshot_get_multi() rc=%d done.", _rc); } +#endif return; } diff --git a/src/gtm/main/gtm_standby.c b/src/gtm/main/gtm_standby.c index f065d2ec19..b5a08deb55 100644 --- a/src/gtm/main/gtm_standby.c +++ b/src/gtm/main/gtm_standby.c @@ -261,7 +261,7 @@ gtm_standby_register_self(const char *node_name, int port, const char *datadir) standbyDataDir= (char *)datadir; rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber, - standbyNodeName, standbyDataDir, NODE_DISCONNECTED); + standbyNodeName, standbyDataDir, NODE_DISCONNECTED); if (rc < 0) { elog(LOG, "Failed to register a standby-GTM status."); @@ -293,7 +293,7 @@ gtm_standby_activate_self(void) } rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber, - standbyNodeName, standbyDataDir, NODE_CONNECTED); + standbyNodeName, standbyDataDir, NODE_CONNECTED); if (rc < 0) { @@ -460,7 +460,13 @@ gtm_standby_reconnect_to_standby(GTM_Conn *old_conn, int retry_max) bool gtm_standby_check_communication_error(int *retry_count, GTM_Conn *oldconn) { - if (GetMyThreadInfo->thr_conn->standby->result->gr_status == GTM_RESULT_COMM_ERROR) + GTM_ThreadInfo *my_threadInfo = GetMyThreadInfo; + + /* + * This function may be called without result from standby. + */ + if (my_threadInfo->thr_conn->standby->result + && my_threadInfo->thr_conn->standby->result->gr_status == GTM_RESULT_COMM_ERROR) { if (*retry_count == 0) { diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c index a5cd72b3ab..1e1a674fc9 100644 --- a/src/gtm/main/gtm_txn.c +++ b/src/gtm/main/gtm_txn.c @@ -28,10 +28,17 @@ #include "gtm/libpq-int.h" #include "gtm/pqformat.h" +extern bool Backup_synchronously; + /* Local functions */ static XidStatus GlobalTransactionIdGetStatus(GlobalTransactionId transactionId); static bool GTM_SetDoVacuum(GTM_TransactionHandle handle); - +static void init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, + char *coord_name, + GTM_TransactionHandle txn, + GTM_IsolationLevel isolevel, + GTMProxy_ConnID connid, + bool readonly); GTM_Transactions GTMTransactions; void @@ -676,23 +683,8 @@ GTM_BeginTransactionMulti(char *coord_name, } } + init_GTM_TransactionInfo(gtm_txninfo[kk], coord_name, ii, isolevel[kk], connid[kk], readonly[kk]); - gtm_txninfo[kk]->gti_gxid = InvalidGlobalTransactionId; - gtm_txninfo[kk]->gti_xmin = InvalidGlobalTransactionId; - gtm_txninfo[kk]->gti_state = GTM_TXN_STARTING; - gtm_txninfo[kk]->gti_coordname = pstrdup(coord_name); - - gtm_txninfo[kk]->gti_isolevel = isolevel[kk]; - gtm_txninfo[kk]->gti_readonly = readonly[kk]; - gtm_txninfo[kk]->gti_backend_id = connid[kk]; - gtm_txninfo[kk]->gti_in_use = true; - - gtm_txninfo[kk]->nodestring = NULL; - gtm_txninfo[kk]->gti_gid = NULL; - - gtm_txninfo[kk]->gti_handle = ii; - gtm_txninfo[kk]->gti_vacuum = false; - gtm_txninfo[kk]->gti_thread_id = pthread_self(); GTMTransactions.gt_lastslot = ii; txns[kk] = ii; @@ -726,6 +718,79 @@ GTM_BeginTransaction(char *coord_name, return txn; } +static void +init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo, + char *coord_name, + GTM_TransactionHandle txn, + GTM_IsolationLevel isolevel, + GTMProxy_ConnID connid, + bool readonly) +{ + gtm_txninfo->gti_gxid = InvalidGlobalTransactionId; + gtm_txninfo->gti_xmin = InvalidGlobalTransactionId; + gtm_txninfo->gti_state = GTM_TXN_STARTING; + gtm_txninfo->gti_coordname = pstrdup(coord_name); + + gtm_txninfo->gti_isolevel = isolevel; + gtm_txninfo->gti_readonly = readonly; + gtm_txninfo->gti_backend_id = connid; + gtm_txninfo->gti_in_use = true; + + gtm_txninfo->nodestring = NULL; + gtm_txninfo->gti_gid = NULL; + + gtm_txninfo->gti_handle = txn; + gtm_txninfo->gti_vacuum = false; + gtm_txninfo->gti_thread_id = pthread_self(); +} + + +void +GTM_BkupBeginTransactionMulti(char *coord_name, + GTM_TransactionHandle *txn, + GTM_IsolationLevel *isolevel, + bool *readonly, + GTMProxy_ConnID *connid, + int txn_count) +{ + GTM_TransactionInfo *gtm_txninfo; + MemoryContext oldContext; + int kk; + + gtm_txninfo = NULL; + + oldContext = MemoryContextSwitchTo(TopMostMemoryContext); + GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE); + + for (kk = 0; kk < txn_count; kk++) + { + gtm_txninfo = >MTransactions.gt_transactions_array[txn[kk]]; + if (gtm_txninfo->gti_in_use) + { + GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); + elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).", + txn[kk]); + return; + } + init_GTM_TransactionInfo(gtm_txninfo, coord_name, txn[kk], isolevel[kk], connid[kk], readonly[kk]); + GTMTransactions.gt_lastslot = txn[kk]; + GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo); + } + + GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); + MemoryContextSwitchTo(oldContext); +} + +void +GTM_BkupBeginTransaction(char *coord_name, + GTM_TransactionHandle txn, + GTM_IsolationLevel isolevel, + bool readonly) +{ + GTMProxy_ConnID connid = -1; + + GTM_BkupBeginTransactionMulti(coord_name, &txn, &isolevel, &readonly, &connid, 1); +} /* * Same as GTM_RollbackTransaction, but takes GXID as input */ @@ -996,6 +1061,15 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + bkup_begin_transaction(GetMyThreadInfo->thr_conn->standby, txn, txn_isolation_level, txn_read_only, timestamp); + /* Synch. with standby */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + } + /* GXID has been received, now it's time to get a GTM timestamp */ timestamp = GTM_TimestampGetCurrent(); @@ -1012,11 +1086,41 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); pq_flush(myport); + } return; } /* + * Process MSG_BKUP_TXN_BEGIN message + */ +void +ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message) +{ + GTM_TransactionHandle txn; + GTM_IsolationLevel txn_isolation_level; + bool txn_read_only; + GTM_Timestamp timestamp; + MemoryContext oldContext; + + txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle)); + txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); + txn_read_only = pq_getmsgbyte(message); + memcpy(×tamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp)); + pq_getmsgend(message); + + oldContext = MemoryContextSwitchTo(TopMemoryContext); + + GTM_BkupBeginTransaction("", txn, txn_isolation_level, txn_read_only); + + MemoryContextSwitchTo(oldContext); +} + +/* * Process MSG_TXN_BEGIN_GETGXID message */ void @@ -1059,6 +1163,27 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) elog(LOG, "Sending transaction id %u", gxid); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + + elog(LOG, "calling begin_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + +retry: + bkup_begin_transaction_gxid(GetMyThreadInfo->thr_conn->standby, + txn, gxid, txn_isolation_level, txn_read_only, timestamp); + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + + } + /* Respond to the client */ pq_beginmessage(&buf, 'S'); pq_sendint(&buf, TXN_BEGIN_GETGXID_RESULT, 4); if (myport->remote_type == GTM_NODE_GTM_PROXY) @@ -1072,27 +1197,112 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); pq_flush(myport); + } - if (GetMyThreadInfo->thr_conn->standby) - { - GTM_Timestamp _ts; - GlobalTransactionId _gxid; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - elog(LOG, "calling begin_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + return; +} -retry: - _gxid = begin_transaction(GetMyThreadInfo->thr_conn->standby, txn_isolation_level, &_ts); +static void +GTM_BkupBeginTransactionGetGXIDMulti(char *coord_name, + GTM_TransactionHandle *txn, + GlobalTransactionId *gxid, + GTM_IsolationLevel *isolevel, + bool *readonly, + GTMProxy_ConnID *connid, + int txn_count) +{ + GTM_TransactionInfo *gtm_txninfo; + int ii; + MemoryContext oldContext; - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + oldContext = MemoryContextSwitchTo(TopMemoryContext); + GTM_RWLockAcquire(>MTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE); - elog(LOG, "begin_transaction() returns GXID %d.", _gxid); + for (ii = 0; ii < txn_count; ii++) + { + gtm_txninfo = >MTransactions.gt_transactions_array[txn[ii]]; + if (gtm_txninfo->gti_in_use) + { + GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); + elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).", + txn[ii]); + return; + } + init_GTM_TransactionInfo(gtm_txninfo, coord_name, txn[ii], isolevel[ii], connid[ii], readonly[ii]); + GTMTransactions.gt_lastslot = txn[ii]; + gtm_txninfo->gti_gxid = gxid[ii]; + /* + * Advance next gxid + */ + if (GlobalTransactionIdPrecedes(GTMTransactions.gt_nextXid, gxid[ii])) + GTMTransactions.gt_nextXid = gxid[ii] + 1; + if (!GlobalTransactionIdIsValid(GTMTransactions.gt_nextXid)) /* Handle wrap around too */ + GTMTransactions.gt_nextXid = FirstNormalGlobalTransactionId; + GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo); } - return; + + GTM_RWLockRelease(>MTransactions.gt_XidGenLock); + MemoryContextSwitchTo(oldContext); +} + +static void +GTM_BkupBeginTransactionGetGXID(char *coord_name, + GTM_TransactionHandle txn, + GlobalTransactionId gxid, + GTM_IsolationLevel isolevel, + bool readonly) +{ + GTMProxy_ConnID connid = -1; + + GTM_BkupBeginTransactionGetGXIDMulti(coord_name, &txn, &gxid, &isolevel, &readonly, &connid, 1); +} + +/* + * Process MSG_BKUP_TXN_BEGIN_GETGXID message + */ +void +ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message) +{ + GTM_TransactionHandle txn; + GlobalTransactionId gxid; + GTM_IsolationLevel txn_isolation_level; + bool txn_read_only; + GTM_Timestamp timestamp; + + txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle)); + gxid = pq_getmsgint(message, sizeof(GlobalTransactionId)); + txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); + txn_read_only = pq_getmsgbyte(message); + memcpy(×tamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp)); + pq_getmsgend(message); + + GTM_BkupBeginTransactionGetGXID("", txn, gxid, txn_isolation_level, txn_read_only); +} + +/* + * Process MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM message + */ +void +ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message) +{ + GTM_TransactionHandle txn; + GlobalTransactionId gxid; + GTM_IsolationLevel txn_isolation_level; + + txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle)); + gxid = pq_getmsgint(message, sizeof(GlobalTransactionId)); + txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); + pq_getmsgend(message); + + GTM_BkupBeginTransactionGetGXID("", txn, gxid, txn_isolation_level, false); + GTM_SetDoVacuum(txn); } /* @@ -1139,20 +1349,7 @@ ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message elog(DEBUG3, "Sending transaction id %d", gxid); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - + /* Backup first */ if (GetMyThreadInfo->thr_conn->standby) { GlobalTransactionId _gxid; @@ -1160,17 +1357,40 @@ ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message int count = 0; elog(LOG, "calling begin_transaction_autovacuum() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + GetMyThreadInfo->thr_conn->standby); retry: - _gxid = begin_transaction_autovacuum(GetMyThreadInfo->thr_conn->standby, - txn_isolation_level); + _gxid = bkup_begin_transaction_autovacuum(GetMyThreadInfo->thr_conn->standby, + txn, gxid, txn_isolation_level); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + elog(LOG, "begin_transaction_autovacuum() GXID=%d done.", _gxid); } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } return; } @@ -1186,7 +1406,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) int txn_count; StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; - GlobalTransactionId gxid, end_gxid; + GlobalTransactionId start_gxid, end_gxid; GTM_Timestamp timestamp; GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS]; MemoryContext oldContext; @@ -1219,8 +1439,8 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) (EINVAL, errmsg("Failed to start %d new transactions", txn_count))); - gxid = GTM_GetGlobalTransactionIdMulti(txn, txn_count); - if (gxid == InvalidGlobalTransactionId) + start_gxid = GTM_GetGlobalTransactionIdMulti(txn, txn_count); + if (start_gxid == InvalidGlobalTransactionId) ereport(ERROR, (EINVAL, errmsg("Failed to get a new transaction id"))); @@ -1230,34 +1450,16 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) /* GXID has been received, now it's time to get a GTM timestamp */ timestamp = GTM_TimestampGetCurrent(); - end_gxid = gxid + txn_count; - if (end_gxid < gxid) + end_gxid = start_gxid + txn_count; + if (end_gxid < start_gxid) end_gxid += FirstNormalGlobalTransactionId; - elog(LOG, "Sending transaction ids from %u to %u", gxid, end_gxid); - - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_BEGIN_GETGXID_MULTI_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); - pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); - pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp)); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "Sending transaction ids from %u to %u", start_gxid, end_gxid); + /* Backup first */ if (GetMyThreadInfo->thr_conn->standby) { int _rc; - int txn_count_out; - GlobalTransactionId gxid_out; - GTM_Timestamp ts_out; GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; int count = 0; @@ -1265,29 +1467,85 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) GetMyThreadInfo->thr_conn->standby); retry: - _rc = begin_transaction_multi(GetMyThreadInfo->thr_conn->standby, - txn_count, - txn_isolation_level, - txn_read_only, - txn_connid, - &txn_count_out, - &gxid_out, - &ts_out); + _rc = bkup_begin_transaction_multi(GetMyThreadInfo->thr_conn->standby, + txn_count, + txn, + start_gxid, + txn_isolation_level, + txn_read_only, + txn_connid); if (gtm_standby_check_communication_error(&count, oldconn)) goto retry; + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + elog(LOG, "begin_transaction_multi() rc=%d done.", _rc); } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_BEGIN_GETGXID_MULTI_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); + pq_sendbytes(&buf, (char *)&start_gxid, sizeof(start_gxid)); + pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp)); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } return; } /* - * Process MSG_TXN_COMMIT message + * Process MSG_BKUP_BEGIN_TXN_GETGXID_MULTI message + */ +void +ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) +{ + int txn_count; + GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; + GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; + GTM_IsolationLevel txn_isolation_level[GTM_MAX_GLOBAL_TRANSACTIONS]; + bool txn_read_only[GTM_MAX_GLOBAL_TRANSACTIONS]; + GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS]; + int ii; + + txn_count = pq_getmsgint(message, sizeof(int)); + if (txn_count <= 0) + elog(PANIC, "Zero or less transaction count."); + + for (ii = 0; ii < txn_count; ii++) + { + txn[ii] = pq_getmsgint(message, sizeof(GTM_TransactionHandle)); + gxid[ii] = pq_getmsgint(message, sizeof(GlobalTransactionId)); + txn_isolation_level[ii] = pq_getmsgint(message, sizeof(GTM_IsolationLevel)); + txn_read_only[ii] = pq_getmsgbyte(message); + txn_connid[ii] = pq_getmsgint(message, sizeof(GTMProxy_ConnID)); + } + + GTM_BkupBeginTransactionGetGXIDMulti("", txn, gxid, txn_isolation_level, txn_read_only, txn_connid, txn_count); + +} +/* + * Process MSG_TXN_COMMIT/MSG_BKUP_TXN_COMMIT message + * + * is_backup indicates the message is MSG_BKUP_TXN_COMMIT */ void -ProcessCommitTransactionCommand(Port *myport, StringInfo message) +ProcessCommitTransactionCommand(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn; @@ -1331,48 +1589,65 @@ ProcessCommitTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_COMMIT_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if(!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); - pq_sendint(&buf, status, sizeof(status)); - pq_endmessage(myport, &buf); + if (GetMyThreadInfo->thr_conn->standby) + { + /* + * Backup first + */ + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling commit_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + _rc = bkup_commit_transaction(GetMyThreadInfo->thr_conn->standby, gxid); - elog(LOG, "calling commit_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; -retry: - _rc = commit_transaction(GetMyThreadInfo->thr_conn->standby, gxid); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "commit_transaction() rc=%d done.", _rc); + } - elog(LOG, "commit_transaction() rc=%d done.", _rc); - } + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_COMMIT_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); + pq_sendint(&buf, status, sizeof(status)); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } return; } /* - * Process MSG_TXN_COMMIT_PREPARED_MSG + * Process MSG_TXN_COMMIT_PREPARED/MSG_BKUP_TXN_COMMIT_PREPARED message * Commit a prepared transaction * Here the GXID used for PREPARE and COMMIT PREPARED are both committed + * + * is_backup indicates the message is MSG_BKUP_TXN_COMMIT_PREPARED */ void -ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message) +ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; int txn_count = 2; /* PREPARE and COMMIT PREPARED gxid's */ @@ -1422,40 +1697,53 @@ ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_COMMIT_PREPARED_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid[0], sizeof(GlobalTransactionId)); - pq_sendint(&buf, status[0], 4); - pq_endmessage(myport, &buf); + if (GetMyThreadInfo->thr_conn->standby) + { + /* Backup first */ + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling commit_prepared_transaction() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + _rc = bkup_commit_prepared_transaction(GetMyThreadInfo->thr_conn->standby, + gxid[0], gxid[1] /* prepared GXID */); - elog(LOG, "calling commit_prepared_transaction() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; -retry: - _rc = commit_prepared_transaction(GetMyThreadInfo->thr_conn->standby, - gxid[0], gxid[1] /* prepared GXID */); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "commit_prepared_transaction() rc=%d done.", _rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_COMMIT_PREPARED_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid[0], sizeof(GlobalTransactionId)); + pq_sendint(&buf, status[0], 4); + pq_endmessage(myport, &buf); - elog(LOG, "commit_prepared_transaction() rc=%d done.", _rc); - } + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } return; } @@ -1549,9 +1837,12 @@ ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message) /* End of message */ pq_endmessage(myport, &buf); + /* No backup to the standby because this does not change internal status */ if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); + /* I don't think the following backup is needed. K.Suzuki, 27th, Dec., 2011 */ +#if 0 if (GetMyThreadInfo->thr_conn->standby) { int _rc; @@ -1574,6 +1865,7 @@ retry: elog(LOG, "get_gid_data() rc=%d done.", _rc); } +#endif return; } @@ -1630,6 +1922,7 @@ ProcessGXIDListCommand(Port *myport, StringInfo message) pq_sendbytes(&buf, data, actlen); /* serialized GTM_Transactions */ pq_endmessage(myport, &buf); + /* No backup to the standby because this does not change internal state */ if (myport->remote_type != GTM_NODE_GTM_PROXY) { pq_flush(myport); @@ -1644,10 +1937,12 @@ ProcessGXIDListCommand(Port *myport, StringInfo message) /* - * Process MSG_TXN_ROLLBACK message + * Process MSG_TXN_ROLLBACK/MSG_BKUP_TXN_ROLLBACK message + * + * is_backup indicates the message is MSG_BKUP_TXN_ROLLBACK */ void -ProcessRollbackTransactionCommand(Port *myport, StringInfo message) +ProcessRollbackTransactionCommand(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn; @@ -1691,46 +1986,61 @@ ProcessRollbackTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_ROLLBACK_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); - pq_sendint(&buf, status, sizeof(status)); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling abort_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + bkup_abort_transaction(GetMyThreadInfo->thr_conn->standby, gxid); - elog(LOG, "calling abort_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; -retry: - abort_transaction(GetMyThreadInfo->thr_conn->standby, gxid); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + + elog(LOG, "abort_transaction() GXID=%d done.", gxid); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_ROLLBACK_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); + pq_sendint(&buf, status, sizeof(status)); + pq_endmessage(myport, &buf); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } - elog(LOG, "abort_transaction() GXID=%d done.", gxid); } - return; } /* - * Process MSG_TXN_COMMIT_MULTI message + * Process MSG_TXN_COMMIT_MULTI/MSG_BKUP_TXN_COMMIT_MULTI message + * + * is_backup indicates the message is MSG_BKUP_TXN_COMMIT_MULTI */ void -ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message) +ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; @@ -1780,50 +2090,60 @@ ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_COMMIT_MULTI_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); - pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) + if (!is_backup) { - int _rc; - int txn_count_out; - int status_out[GTM_MAX_GLOBAL_TRANSACTIONS]; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + if (GetMyThreadInfo->thr_conn->standby) + { + /* Backup first */ + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - elog(LOG, "calling commit_transaction_multi() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + elog(LOG, "calling commit_transaction_multi() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); -retry: - _rc = commit_transaction_multi(GetMyThreadInfo->thr_conn->standby, - txn_count, gxid, &txn_count_out, status_out); + retry: + _rc = bkup_commit_transaction_multi(GetMyThreadInfo->thr_conn->standby, txn_count, txn); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - elog(LOG, "commit_transaction_multi() rc=%d done.", _rc); - } + elog(LOG, "commit_transaction_multi() rc=%d done.", _rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_COMMIT_MULTI_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); + pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); + pq_endmessage(myport, &buf); + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } return; } /* - * Process MSG_TXN_ROLLBACK_MULTI message + * Process MSG_TXN_ROLLBACK_MULTI/MSG_BKUP_TXN_ROLLBACK_MULTI message + * + * is_backup indicates the message is MSG_BKUP_TXN_ROLLBACK_MULTI */ void -ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message) +ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; @@ -1873,50 +2193,62 @@ ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_ROLLBACK_MULTI_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); - pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling abort_transaction_multi() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - int txn_count_out; - int status_out[GTM_MAX_GLOBAL_TRANSACTIONS]; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + _rc = bkup_abort_transaction_multi(GetMyThreadInfo->thr_conn->standby, txn_count, gxid); + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - elog(LOG, "calling abort_transaction_multi() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + /* Sync */ + if (Backup_synchronously &&(myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); -retry: - _rc = abort_transaction_multi(GetMyThreadInfo->thr_conn->standby, - txn_count, gxid, &txn_count_out, status_out); - - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "abort_transaction_multi() rc=%d done.", _rc); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_ROLLBACK_MULTI_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); + pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); + pq_endmessage(myport, &buf); - elog(LOG, "abort_transaction_multi() rc=%d done.", _rc); - } + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } return; } /* - * Process MSG_TXN_START_PREPARED message + * Process MSG_TXN_START_PREPARED/MSG_BKUP_TXN_START_PREPARED message + * + * is_backup indicates if the message is MSG_BKUP_TXN_START_PREPARED. */ void -ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message) +ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn; @@ -1973,48 +2305,64 @@ ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_START_PREPARED_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid, sizeof(GlobalTransactionId)); - pq_endmessage(myport, &buf); + /* + * Backup first + */ + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling start_prepared_transaction() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + _rc = backup_start_prepared_transaction(GetMyThreadInfo->thr_conn->standby, + gxid, gid, + nodestring); - elog(LOG, "calling start_prepared_transaction() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - retry: - _rc = start_prepared_transaction(GetMyThreadInfo->thr_conn->standby, - gxid, gid, - nodestring); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "start_prepared_transaction() rc=%d done.", _rc); + } + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_START_PREPARED_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof(GlobalTransactionId)); + pq_endmessage(myport, &buf); - elog(LOG, "start_prepared_transaction() rc=%d done.", _rc); - } + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } + } return; } /* - * Process MSG_TXN_PREPARE message + * Process MSG_TXN_PREPARE/MSG_BKUP_TXN_PREPARE message + * + * is_backup indicates the message is MSG_BKUP_TXN_PREPARE */ void -ProcessPrepareTransactionCommand(Port *myport, StringInfo message) +ProcessPrepareTransactionCommand(Port *myport, StringInfo message, bool is_backup) { StringInfoData buf; GTM_TransactionHandle txn; @@ -2058,42 +2406,60 @@ ProcessPrepareTransactionCommand(Port *myport, StringInfo message) elog(LOG, "Preparing transaction id %u", gxid); - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, TXN_PREPARE_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); - pq_endmessage(myport, &buf); + /* Backup first */ + if (GetMyThreadInfo->thr_conn->standby) + { + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); + elog(LOG, "calling prepare_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); - if (GetMyThreadInfo->thr_conn->standby) - { - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; + retry: + bkup_prepare_transaction(GetMyThreadInfo->thr_conn->standby, gxid); - elog(LOG, "calling prepare_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby); + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; - retry: - prepare_transaction(GetMyThreadInfo->thr_conn->standby, gxid); + /* Sync */ + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + elog(LOG, "prepare_transaction() GXID=%d done.", gxid); + } + /* Respond to the client */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, TXN_PREPARE_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); + pq_endmessage(myport, &buf); - elog(LOG, "prepare_transaction() GXID=%d done.", gxid); + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + /* Flush the standby */ + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } } - return; + } /* * Process MSG_TXN_GET_GXID message + * + * Notice: we don't have corresponding functions in gtm_client.c which + * generates a command for this function. + * + * Because of this, GTM-standby extension is not included in this function. */ void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message) @@ -2150,6 +2516,8 @@ ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message) /* * Process MSG_TXN_GET_NEXT_GXID message + * + * This does not need backup to the standby because no internal state changes. */ void ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message) diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c index 7062c31f6a..997f42dd5d 100644 --- a/src/gtm/main/main.c +++ b/src/gtm/main/main.c @@ -45,6 +45,7 @@ #include "gtm/gtm_seq.h" #include "gtm/gtm_msg.h" #include "gtm/gtm_opt.h" +#include "gtm/gtm_utils.h" extern int optind; extern char *optarg; @@ -62,15 +63,24 @@ int GTMPortNumber; char GTMControlFile[GTM_MAX_PATH]; char *GTMDataDir; char *NodeName; +bool Backup_synchronously = false; char *active_addr; int active_port; -int keepalives_idle; -int keepalives_interval; -int keepalives_count; +int tcp_keepalives_idle; +int tcp_keepalives_interval; +int tcp_keepalives_count; char *error_reporter; char *status_reader; bool isStartUp; +/* If this is GTM or not */ +/* + * Used to determine if given Port is in GTM or in GT_Proxy. + * If it is in GTM, we should consider to flush GTM_Conn before + * writing anything to Port. + */ +bool isGTM = true; + GTM_ThreadID TopMostThreadID; /* The socket(s) we're listening to. */ @@ -104,6 +114,7 @@ static void ChangeToDataDir(void); static void checkDataDir(void); static void DeleteLockFile(const char *filename); static void PromoteToActive(void); +static void ProcessSyncStandbyCommand(Port *myport, GTM_MessageType mtype, StringInfo message); /* * One-time initialization. It's called immediately after the main process @@ -200,7 +211,7 @@ BaseInit() static void GTM_SigleHandler(int signal) { - fprintf(stderr, "Received signal %d", signal); + fprintf(stderr, "Received signal %d\n", signal); switch (signal) { @@ -538,7 +549,7 @@ main(int argc, char *argv[]) elog(LOG, "Startup connection established with active-GTM."); } - elog(DEBUG3, "Starting GTM server at (%s:%d) -- control file %s", ListenAddresses, GTMPortNumber, GTMControlFile); + elog(LOG, "Starting GTM server at (%s:%d) -- control file %s", ListenAddresses, GTMPortNumber, GTMControlFile); /* * Read the last GXID and start from there @@ -864,6 +875,7 @@ ServerLoop(void) GTM_Conn *standby = NULL; standby = gtm_standby_connect_to_standby(); + if (GTMAddConnection(port, standby) != STATUS_OK) { @@ -1118,6 +1130,14 @@ GTM_ThreadMain(void *argp) * Flush all the outgoing data on the wire. Consume the message * type field for sanity */ + /* Sync with standby first */ + if (thrinfo->thr_conn->standby) + { + if (Backup_synchronously) + gtm_sync_standby(thrinfo->thr_conn->standby); + else + gtmpqFlush(thrinfo->thr_conn->standby); + } pq_getmsgint(&input_message, sizeof (GTM_MessageType)); pq_getmsgend(&input_message); pq_flush(thrinfo->thr_conn->con_port); @@ -1161,10 +1181,23 @@ ProcessCommand(Port *myport, StringInfo input_message) myport->conn_id = proxyhdr.ph_conid; mtype = pq_getmsgint(input_message, sizeof (GTM_MessageType)); + /* + * The next line will have some overhead. Better to be in + * compile option. + */ +#ifdef GTM_DEBUG + elog(DEBUG3, "mtype = %s (%d).", gtm_util_message_name(mtype), (int)mtype); +#endif + switch (mtype) { + case MSG_SYNC_STANDBY: + ProcessSyncStandbyCommand(myport, mtype, input_message); + break; case MSG_NODE_REGISTER: + case MSG_BKUP_NODE_REGISTER: case MSG_NODE_UNREGISTER: + case MSG_BKUP_NODE_UNREGISTER: case MSG_NODE_LIST: ProcessPGXCNodeCommand(myport, mtype, input_message); break; @@ -1176,17 +1209,29 @@ ProcessCommand(Port *myport, StringInfo input_message) case MSG_NODE_BEGIN_REPLICATION_INIT: case MSG_NODE_END_REPLICATION_INIT: case MSG_TXN_BEGIN: + case MSG_BKUP_TXN_BEGIN: case MSG_TXN_BEGIN_GETGXID: + case MSG_BKUP_TXN_BEGIN_GETGXID: case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM: + case MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM: case MSG_TXN_PREPARE: + case MSG_BKUP_TXN_PREPARE: case MSG_TXN_START_PREPARED: + case MSG_BKUP_TXN_START_PREPARED: case MSG_TXN_COMMIT: + case MSG_BKUP_TXN_COMMIT: case MSG_TXN_COMMIT_PREPARED: + case MSG_BKUP_TXN_COMMIT_PREPARED: case MSG_TXN_ROLLBACK: + case MSG_BKUP_TXN_ROLLBACK: case MSG_TXN_GET_GXID: + case MSG_BKUP_TXN_GET_GXID: case MSG_TXN_BEGIN_GETGXID_MULTI: + case MSG_BKUP_TXN_BEGIN_GETGXID_MULTI: case MSG_TXN_COMMIT_MULTI: + case MSG_BKUP_TXN_COMMIT_MULTI: case MSG_TXN_ROLLBACK_MULTI: + case MSG_BKUP_TXN_ROLLBACK_MULTI: case MSG_TXN_GET_GID_DATA: case MSG_TXN_GET_NEXT_GXID: case MSG_TXN_GXID_LIST: @@ -1200,14 +1245,21 @@ ProcessCommand(Port *myport, StringInfo input_message) break; case MSG_SEQUENCE_INIT: + case MSG_BKUP_SEQUENCE_INIT: case MSG_SEQUENCE_GET_CURRENT: case MSG_SEQUENCE_GET_NEXT: + case MSG_BKUP_SEQUENCE_GET_NEXT: case MSG_SEQUENCE_GET_LAST: case MSG_SEQUENCE_SET_VAL: + case MSG_BKUP_SEQUENCE_SET_VAL: case MSG_SEQUENCE_RESET: + case MSG_BKUP_SEQUENCE_RESET: case MSG_SEQUENCE_CLOSE: + case MSG_BKUP_SEQUENCE_CLOSE: case MSG_SEQUENCE_RENAME: + case MSG_BKUP_SEQUENCE_RENAME: case MSG_SEQUENCE_ALTER: + case MSG_BKUP_SEQUENCE_ALTER: case MSG_SEQUENCE_LIST: ProcessSequenceCommand(myport, mtype, input_message); break; @@ -1247,7 +1299,7 @@ GTMAddConnection(Port *port, GTM_Conn *standby) errmsg("Out of memory"))); return STATUS_ERROR; } - + elog(DEBUG3, "Started new connection"); conninfo->con_port = port; @@ -1338,17 +1390,46 @@ ReadCommand(Port *myport, StringInfo inBuf) return qtype; } +/* + * Process MSG_SYNC_STANDBY message + */ +static void +ProcessSyncStandbyCommand(Port *myport, GTM_MessageType mtype, StringInfo message) +{ + StringInfoData buf; + + pq_getmsgend(message); + + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, SYNC_STANDBY_RESULT, 4); + pq_endmessage(myport, &buf); + /* Sync standby first */ + if (GetMyThreadInfo->thr_conn->standby) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); +} + + + static void ProcessPGXCNodeCommand(Port *myport, GTM_MessageType mtype, StringInfo message) { switch (mtype) { case MSG_NODE_REGISTER: - ProcessPGXCNodeRegister(myport, message); + ProcessPGXCNodeRegister(myport, message, false); + break; + + case MSG_BKUP_NODE_REGISTER: + ProcessPGXCNodeRegister(myport, message, true); break; case MSG_NODE_UNREGISTER: - ProcessPGXCNodeUnregister(myport, message); + ProcessPGXCNodeUnregister(myport, message, false); + break; + + case MSG_BKUP_NODE_UNREGISTER: + ProcessPGXCNodeUnregister(myport, message, true); break; case MSG_NODE_LIST: @@ -1379,47 +1460,96 @@ ProcessTransactionCommand(Port *myport, GTM_MessageType mtype, StringInfo messag ProcessBeginTransactionCommand(myport, message); break; + case MSG_BKUP_TXN_BEGIN: + ProcessBkupBeginTransactionCommand(myport, message); + break; + case MSG_TXN_BEGIN_GETGXID: ProcessBeginTransactionGetGXIDCommand(myport, message); break; + case MSG_BKUP_TXN_BEGIN_GETGXID: + ProcessBkupBeginTransactionGetGXIDCommand(myport, message); + case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM: ProcessBeginTransactionGetGXIDAutovacuumCommand(myport, message); break; + case MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM: + ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(myport, message); + break; + case MSG_TXN_BEGIN_GETGXID_MULTI: ProcessBeginTransactionGetGXIDCommandMulti(myport, message); break; + case MSG_BKUP_TXN_BEGIN_GETGXID_MULTI: + ProcessBkupBeginTransactionGetGXIDCommandMulti(myport, message); + break; + case MSG_TXN_START_PREPARED: - ProcessStartPreparedTransactionCommand(myport, message); + ProcessStartPreparedTransactionCommand(myport, message, false); + break; + + case MSG_BKUP_TXN_START_PREPARED: + ProcessStartPreparedTransactionCommand(myport, message, true); break; case MSG_TXN_PREPARE: - ProcessPrepareTransactionCommand(myport, message); + ProcessPrepareTransactionCommand(myport, message, false); + break; + + case MSG_BKUP_TXN_PREPARE: + ProcessPrepareTransactionCommand(myport, message, true); break; case MSG_TXN_COMMIT: - ProcessCommitTransactionCommand(myport, message); + ProcessCommitTransactionCommand(myport, message, false); + break; + + case MSG_BKUP_TXN_COMMIT: + ProcessCommitTransactionCommand(myport, message, true); break; case MSG_TXN_COMMIT_PREPARED: - ProcessCommitPreparedTransactionCommand(myport, message); + ProcessCommitPreparedTransactionCommand(myport, message, false); + break; + + case MSG_BKUP_TXN_COMMIT_PREPARED: + ProcessCommitPreparedTransactionCommand(myport, message, true); break; case MSG_TXN_ROLLBACK: - ProcessRollbackTransactionCommand(myport, message); + ProcessRollbackTransactionCommand(myport, message, false); + break; + + case MSG_BKUP_TXN_ROLLBACK: + ProcessRollbackTransactionCommand(myport, message, true); break; case MSG_TXN_COMMIT_MULTI: - ProcessCommitTransactionCommandMulti(myport, message); + ProcessCommitTransactionCommandMulti(myport, message, false); + break; + + case MSG_BKUP_TXN_COMMIT_MULTI: + ProcessCommitTransactionCommandMulti(myport, message, true); break; case MSG_TXN_ROLLBACK_MULTI: - ProcessRollbackTransactionCommandMulti(myport, message); + ProcessRollbackTransactionCommandMulti(myport, message, false); + break; + + case MSG_BKUP_TXN_ROLLBACK_MULTI: + ProcessRollbackTransactionCommandMulti(myport, message, true); break; case MSG_TXN_GET_GXID: + /* + * Notice: we don't have corresponding functions in gtm_client.c + * + * Because this function is not used, GTM-standby extension is not + * included in this function. + */ ProcessGetGXIDTransactionCommand(myport, message); break; @@ -1469,11 +1599,19 @@ ProcessSequenceCommand(Port *myport, GTM_MessageType mtype, StringInfo message) switch (mtype) { case MSG_SEQUENCE_INIT: - ProcessSequenceInitCommand(myport, message); + ProcessSequenceInitCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_INIT: + ProcessSequenceInitCommand(myport, message, true); break; case MSG_SEQUENCE_ALTER: - ProcessSequenceAlterCommand(myport, message); + ProcessSequenceAlterCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_ALTER: + ProcessSequenceAlterCommand(myport, message, true); break; case MSG_SEQUENCE_GET_CURRENT: @@ -1481,23 +1619,43 @@ ProcessSequenceCommand(Port *myport, GTM_MessageType mtype, StringInfo message) break; case MSG_SEQUENCE_GET_NEXT: - ProcessSequenceGetNextCommand(myport, message); + ProcessSequenceGetNextCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_GET_NEXT: + ProcessSequenceGetNextCommand(myport, message, true); break; case MSG_SEQUENCE_SET_VAL: - ProcessSequenceSetValCommand(myport, message); + ProcessSequenceSetValCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_SET_VAL: + ProcessSequenceSetValCommand(myport, message, true); break; case MSG_SEQUENCE_RESET: - ProcessSequenceResetCommand(myport, message); + ProcessSequenceResetCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_RESET: + ProcessSequenceResetCommand(myport, message, true); break; case MSG_SEQUENCE_CLOSE: - ProcessSequenceCloseCommand(myport, message); + ProcessSequenceCloseCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_CLOSE: + ProcessSequenceCloseCommand(myport, message, true); break; case MSG_SEQUENCE_RENAME: - ProcessSequenceRenameCommand(myport, message); + ProcessSequenceRenameCommand(myport, message, false); + break; + + case MSG_BKUP_SEQUENCE_RENAME: + ProcessSequenceRenameCommand(myport, message, true); break; case MSG_SEQUENCE_LIST: diff --git a/src/gtm/proxy/gtm_proxy_opt.c b/src/gtm/proxy/gtm_proxy_opt.c index 6fdd470b2f..20c4c492dd 100644 --- a/src/gtm/proxy/gtm_proxy_opt.c +++ b/src/gtm/proxy/gtm_proxy_opt.c @@ -44,9 +44,9 @@ extern int GTMPortNumber; extern char *error_reporter; extern char *status_reader; extern int log_min_messages; -extern int keepalives_idle; -extern int keepalives_count; -extern int keepalives_interval; +extern int tcp_keepalives_idle; +extern int tcp_keepalives_count; +extern int tcp_keepalives_interval; extern char *GTMServerHost; extern int GTMProxyPortNumber; extern bool IsGTMConnectRetryRequired; @@ -54,9 +54,11 @@ extern int GTMConnectRetryIdle; extern int GTMConnectRetryCount; extern int GTMConnectRetryInterval; extern int GTMServerPortNumber; +/* extern int GTMServerKeepalivesIdle; extern int GTMServerKeepalivesInterval; extern int GTMServerKeepalivesCount; +*/ extern int GTMErrorWaitIdle; extern int GTMErrorWaitInterval; extern int GTMErrorWaitCount; @@ -222,7 +224,7 @@ struct config_int ConfigureNamesInt[] = NULL, GTMOPT_UNIT_TIME }, - >MServerKeepalivesIdle, + &tcp_keepalives_idle, 0, 0, INT_MAX, 0, NULL }, @@ -233,7 +235,7 @@ struct config_int ConfigureNamesInt[] = NULL, GTMOPT_UNIT_TIME }, - >MServerKeepalivesInterval, + &tcp_keepalives_interval, 0, 0, INT_MAX, 0, NULL }, @@ -244,7 +246,7 @@ struct config_int ConfigureNamesInt[] = NULL, 0 }, - >MServerKeepalivesCount, + &tcp_keepalives_count, 0, 0, INT_MAX, 0, NULL }, diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c index 8317d3dc61..67228397c0 100644 --- a/src/gtm/proxy/proxy_main.c +++ b/src/gtm/proxy/proxy_main.c @@ -55,7 +55,13 @@ extern char *optarg; #define GTM_PROXY_DEFAULT_WORKERS 2 #define GTM_PID_FILE "gtm_proxy.pid" #define GTM_LOG_FILE "gtm_proxy.log" -#define PROXY_CLIENT_TIMEOUT 20 +#ifndef PROXY_CLIENT_TIMEOUT +#ifdef GTM_DEBUG +#define PROXY_CLIENT_TIMEOUT 3600 +#else +#define PROXY_CLIENT_TIMEOUT 20 +#endif +#endif static char *progname = "gtm_proxy"; char *ListenAddresses; @@ -81,9 +87,9 @@ int GTMConnectRetryInterval = 0; /* * Keepalives setup for the connection with GTM server */ -int GTMServerKeepalivesIdle = 0; -int GTMServerKeepalivesInterval = 0; -int GTMServerKeepalivesCount = 0; +int tcp_keepalives_idle = 0; +int tcp_keepalives_interval = 0; +int tcp_keepalives_count = 0; char *GTMProxyNodeName = NULL; GTM_ThreadID TopMostThreadID; @@ -106,6 +112,14 @@ GTM_RWLock ReconnectControlLock; jmp_buf mainThreadSIGUSR1_buf; int SIGUSR1Accepted = FALSE; +/* If this is GTM or not */ +/* + * Used to determine if given Port is in GTM or in GT_Proxy. + * If it is in GTM, we should consider to flush GTM_Conn before + * writing anything to Port. + */ +bool isGTM = false; + /* The socket(s) we're listening to. */ #define MAXLISTEN 64 static int ListenSocket[MAXLISTEN]; @@ -345,9 +359,9 @@ GTMProxy_ReadReconnectInfo(void) return(-1); } fclose(optarg_file); -#ifdef GTM_SBY_DEBUG + elog(LOG, "reconnect option = \"%s\"\n", optstr); -#endif + next_token = optstr; while ((option = read_token(next_token, &next_token))) { @@ -389,7 +403,7 @@ GTMProxy_SigleHandler(int signal) { int ii; - elog(LOG, "Received signal %d", signal); + elog(LOG, "Received signal %d\n", signal); switch (signal) { @@ -408,14 +422,12 @@ GTMProxy_SigleHandler(int signal) * The mask is set to block signals. They're blocked until all the * threads reconnect to the new GTM. */ -#ifdef GTM_SBY_DEBUG elog(LOG, "Accepted SIGUSR1\n"); -#endif if (MyThreadID != TopMostThreadID) { -#ifdef GTM_SBY_DEBUG + elog(LOG, "Not on main thread, proxy the signal to the main thread."); -#endif + pthread_kill(TopMostThreadID, SIGUSR1); return; } @@ -423,9 +435,9 @@ GTMProxy_SigleHandler(int signal) * Then this is the main thread. */ PG_SETMASK(&BlockSig); -#ifdef GTM_SBY_DEBUG + elog(LOG, "I'm the main thread. Accepted SIGUSR1."); -#endif + /* * Set Reconnect Info */ @@ -485,15 +497,15 @@ GTMProxy_SigleHandler(int signal) case SIGUSR2: /* Reconnect from the main thread */ /* Main thread has nothing to do twith this signal and should not receive this. */ PG_SETMASK(&BlockSig); -#ifdef GTM_SBY_DEBUG + elog(LOG, "Detected SIGUSR2, thread:%ld", MyThreadID); -#endif + if (MyThreadID == TopMostThreadID) { /* This should not be reached. Just in case. */ -#ifdef GTM_SBY_DEBUG + elog(LOG, "SIGUSR2 received by the main thread. Ignoring."); -#endif + PG_SETMASK(&UnBlockSig); return; } @@ -792,7 +804,7 @@ main(int argc, char *argv[]) */ BaseInit(); - elog(DEBUG3, "Starting GTM proxy at (%s:%d)", ListenAddresses, GTMProxyPortNumber); + elog(LOG, "Starting GTM proxy at (%s:%d)", ListenAddresses, GTMProxyPortNumber); /* Recover Data of Registered nodes. */ Recovery_RestoreRegisterInfo(); @@ -3258,10 +3270,16 @@ RegisterProxy(bool is_reconnect) finish_time = time(NULL) + PROXY_CLIENT_TIMEOUT; if (gtmpqWaitTimed(true, false, master_conn, finish_time) || gtmpqReadData(master_conn) < 0) + { + elog(ERROR, "Cannot read data."); goto failed; + } if ((res = GTMPQgetResult(master_conn)) == NULL) + { + elog(ERROR, "Cannot get result."); goto failed; + } if (res->gr_status == GTM_RESULT_OK) { diff --git a/src/gtm/recovery/register_gtm.c b/src/gtm/recovery/register_gtm.c index 7f9b1543e4..9ca0e20a6d 100644 --- a/src/gtm/recovery/register_gtm.c +++ b/src/gtm/recovery/register_gtm.c @@ -33,12 +33,15 @@ #include "gtm/gtm_ip.h" static void finishStandbyConn(GTM_ThreadInfo *thrinfo); +extern bool Backup_synchronously; /* - * Process MSG_NODE_REGISTER + * Process MSG_NODE_REGISTER/MSG_BKUP_NODE_REGISTER message. + * + * is_backup indicates the message is MSG_BKUP_NODE_REGISTER. */ void -ProcessPGXCNodeRegister(Port *myport, StringInfo message) +ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup) { GTM_PGXCNodeType type; GTM_PGXCNodePort port; @@ -172,64 +175,79 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message) pq_getmsgend(message); - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, NODE_REGISTER_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) + if (!is_backup) { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); - /* Node name length */ - pq_sendint(&buf, strlen(node_name), 4); - /* Node name (var-len) */ - pq_sendbytes(&buf, node_name, strlen(node_name)); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) - { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - GTM_PGXCNodeInfo *standbynode; - - elog(LOG, "calling node_register_internal() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); - -retry: - _rc = node_register_internal(GetMyThreadInfo->thr_conn->standby, - type, - ipaddress, - port, - node_name, - datafolder, - status); - - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; - - /* Now check if there're other standby registered. */ - standbynode = find_standby_node_info(); - if (!standbynode) - GTMThreads->gt_standby_ready = false; - - elog(LOG, "node_register_internal() returns rc %d.", _rc); + /* + * Backup first + */ + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + GTM_PGXCNodeInfo *standbynode; + + elog(LOG, "calling node_register_internal() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); + + retry: + _rc = bkup_node_register_internal(GetMyThreadInfo->thr_conn->standby, + type, + ipaddress, + port, + node_name, + datafolder, + status); + + elog(LOG, "node_register_internal() returns rc %d.", _rc); + + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + /* Now check if there're other standby registered. */ + standbynode = find_standby_node_info(); + if (!standbynode) + GTMThreads->gt_standby_ready = false; + + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); + + } + /* + * Then, send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, NODE_REGISTER_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); + /* Node name length */ + pq_sendint(&buf, strlen(node_name), 4); + /* Node name (var-len) */ + pq_sendbytes(&buf, node_name, strlen(node_name)); + pq_endmessage(myport, &buf); + + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } } } /* - * Process MSG_NODE_UNREGISTER + * Process MSG_NODE_UNREGISTER/MSG_BKUP_NODE_UNREGISTER + * + * is_backup indiccates MSG_BKUP_NODE_UNREGISTER */ void -ProcessPGXCNodeUnregister(Port *myport, StringInfo message) +ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup) { GTM_PGXCNodeType type; MemoryContext oldContext; @@ -268,47 +286,61 @@ ProcessPGXCNodeUnregister(Port *myport, StringInfo message) pq_getmsgend(message); - /* - * Send a SUCCESS message back to the client - */ - pq_beginmessage(&buf, 'S'); - pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4); - if (myport->remote_type == GTM_NODE_GTM_PROXY) - { - GTM_ProxyMsgHeader proxyhdr; - proxyhdr.ph_conid = myport->conn_id; - pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); - } - pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); - /* Node name length */ - pq_sendint(&buf, strlen(node_name), 4); - /* Node name (var-len) */ - pq_sendbytes(&buf, node_name, strlen(node_name)); - pq_endmessage(myport, &buf); - - if (myport->remote_type != GTM_NODE_GTM_PROXY) - pq_flush(myport); - - if (GetMyThreadInfo->thr_conn->standby) + if (!is_backup) { - int _rc; - GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; - int count = 0; - - elog(LOG, "calling node_unregister() for standby GTM %p.", - GetMyThreadInfo->thr_conn->standby); + /* + * Backup first + */ + if (GetMyThreadInfo->thr_conn->standby) + { + int _rc; + GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby; + int count = 0; + + elog(LOG, "calling node_unregister() for standby GTM %p.", + GetMyThreadInfo->thr_conn->standby); -retry: - _rc = node_unregister(GetMyThreadInfo->thr_conn->standby, - type, - node_name); + retry: + _rc = bkup_node_unregister(GetMyThreadInfo->thr_conn->standby, + type, + node_name); - if (gtm_standby_check_communication_error(&count, oldconn)) - goto retry; + if (gtm_standby_check_communication_error(&count, oldconn)) + goto retry; + + if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY)) + gtm_sync_standby(GetMyThreadInfo->thr_conn->standby); - elog(LOG, "node_unregister() returns rc %d.", _rc); + elog(LOG, "node_unregister() returns rc %d.", _rc); + } + /* + * Send a SUCCESS message back to the client + */ + pq_beginmessage(&buf, 'S'); + pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4); + if (myport->remote_type == GTM_NODE_GTM_PROXY) + { + GTM_ProxyMsgHeader proxyhdr; + proxyhdr.ph_conid = myport->conn_id; + pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); + } + pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType)); + /* Node name length */ + pq_sendint(&buf, strlen(node_name), 4); + /* Node name (var-len) */ + pq_sendbytes(&buf, node_name, strlen(node_name)); + + pq_endmessage(myport, &buf); + + /* Flush standby before flush to the client */ + if (myport->remote_type != GTM_NODE_GTM_PROXY) + { + if (GetMyThreadInfo->thr_conn->standby) + gtmpqFlush(GetMyThreadInfo->thr_conn->standby); + pq_flush(myport); + } } } diff --git a/src/gtm/recovery/replication.c b/src/gtm/recovery/replication.c index c8005652d2..ae351ff26a 100644 --- a/src/gtm/recovery/replication.c +++ b/src/gtm/recovery/replication.c @@ -67,6 +67,10 @@ ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message) } pq_endmessage(myport, &buf); + /* + * Beause this command comes from the standby, we don't have to flush + * messages to the standby here. + */ if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); @@ -112,6 +116,10 @@ ProcessEndReplicationInitialSyncRequest(Port *myport, StringInfo message) } pq_endmessage(myport, &buf); + /* + * Beause this command comes from the standby, we don't have to flush + * messages to the standby here. + */ if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h index 971277816e..f619b37acc 100644 --- a/src/include/gtm/gtm_client.h +++ b/src/include/gtm/gtm_client.h @@ -166,13 +166,26 @@ size_t get_sequence_list(GTM_Conn *, GTM_SeqInfo **, size_t); * Transaction Management API */ GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp); +int bkup_begin_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, GTM_IsolationLevel isolevel, + bool read_only, GTM_Timestamp timestamp); +int bkup_begin_transaction_gxid(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid, + GTM_IsolationLevel isolevel, bool read_only, GTM_Timestamp timestamp); + GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLevel isolevel); +int bkup_begin_transaction_autovacuum(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid, + GTM_IsolationLevel isolevel); int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid); +int bkup_commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid); int commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid); +int bkup_commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid); int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid); +int bkup_abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid); int start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid, char *nodestring); +int backup_start_prepared_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, char *gid, + char *nodestring); int prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid); +int bkup_prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid); int get_gid_data(GTM_Conn *conn, GTM_IsolationLevel isolevel, char *gid, GlobalTransactionId *gxid, GlobalTransactionId *prepared_gxid, char **nodestring); @@ -185,12 +198,20 @@ begin_transaction_multi(GTM_Conn *conn, int txn_count, GTM_IsolationLevel *txn_i bool *txn_read_only, GTMProxy_ConnID *txn_connid, int *txn_count_out, GlobalTransactionId *gxid_out, GTM_Timestamp *ts_out); int +bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count, + GTM_TransactionHandle *txn, GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel, + bool *read_only, GTMProxy_ConnID *txn_connid); +int commit_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, - int *txn_count_out, int *status_out); + int *txn_count_out, int *status_out); +int +bkup_commit_transaction_multi(GTM_Conn *conn, int txn_count, GTM_TransactionHandle *txn); int abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, int *txn_count_out, int *status_out); int +bkup_abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid); +int snapshot_get_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid, int *txn_count_out, int *status_out, GlobalTransactionId *xmin_out, GlobalTransactionId *xmax_out, @@ -206,14 +227,24 @@ GTM_SnapshotData *get_snapshot(GTM_Conn *conn, GlobalTransactionId gxid, * Node Registering management API */ int node_register(GTM_Conn *conn, - GTM_PGXCNodeType type, - GTM_PGXCNodePort port, - char *node_name, - char *datafolder); -int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, - GTM_PGXCNodePort port, char *node_name, char *datafolder, - GTM_PGXCNodeStatus status); + GTM_PGXCNodeType type, + GTM_PGXCNodePort port, + char *node_name, + char *datafolder); +int bkup_node_register(GTM_Conn *conn, + GTM_PGXCNodeType type, + GTM_PGXCNodePort port, + char *node_name, + char *datafolder); +int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder); +int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, char *node_name, + char *datafolder, GTM_PGXCNodeStatus status); +int bkup_node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder); +int bkup_node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, + char *node_name, char *datafolder, GTM_PGXCNodeStatus status); + int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char *node_name); +int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name); int backend_disconnect(GTM_Conn *conn, bool is_postmaster, GTM_PGXCNodeType type, char *node_name); char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc); @@ -223,20 +254,32 @@ char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc); int open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, bool cycle); +int bkup_open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, bool cycle); int alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, GTM_Sequence minval, GTM_Sequence maxval, GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart); +int bkup_alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment, + GTM_Sequence minval, GTM_Sequence maxval, + GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart); int close_sequence(GTM_Conn *conn, GTM_SequenceKey key); +int bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key); int rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey); +int bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey); GTM_Sequence get_current(GTM_Conn *conn, GTM_SequenceKey key); GTM_Sequence get_next(GTM_Conn *conn, GTM_SequenceKey key); +GTM_Sequence bkup_get_next(GTM_Conn *conn, GTM_SequenceKey key); int set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool is_called); +int bkup_set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool is_called); int reset_sequence(GTM_Conn *conn, GTM_SequenceKey key); +int bkup_reset_sequence(GTM_Conn *conn, GTM_SequenceKey key); /* * GTM-Standby */ int set_begin_end_backup(GTM_Conn *conn, bool begin); +int gtm_sync_standby(GTM_Conn *conn); #endif diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h index c7c1259eba..b22a709b0f 100644 --- a/src/include/gtm/gtm_msg.h +++ b/src/include/gtm/gtm_msg.h @@ -14,47 +14,74 @@ #ifndef GTM_MSG_H #define GTM_MSG_H +/* + * The following enum symbols are also used in message_name_tab structure + * in gtm_utils.c. Modification of the following enum should reflect + * changes to message_name_tab structure as well. + */ typedef enum GTM_MessageType { MSG_TYPE_INVALID, - MSG_NODE_REGISTER, /* Register a PGXC Node with GTM */ - MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */ - MSG_NODE_LIST, + MSG_SYNC_STANDBY, /* Message to sync woth GTM-Standby */ + MSG_NODE_REGISTER, /* Register a PGXC Node with GTM */ + MSG_BKUP_NODE_REGISTER, /* Backup of MSG_NODE_REGISTER */ + MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */ + MSG_BKUP_NODE_UNREGISTER, /* Backup of MSG_NODE_UNREGISTER */ + MSG_NODE_LIST, /* Get node list */ MSG_NODE_BEGIN_REPLICATION_INIT, MSG_NODE_END_REPLICATION_INIT, - MSG_BEGIN_BACKUP, /* Start backup by Standby */ - MSG_END_BACKUP, /* End backup preparation by Standby */ - MSG_TXN_BEGIN, /* Start a new transaction */ - MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */ - MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */ - MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */ + MSG_BEGIN_BACKUP, /* Start backup by Standby */ + MSG_END_BACKUP, /* End backup preparation by Standby */ + MSG_TXN_BEGIN, /* Start a new transaction */ + MSG_BKUP_TXN_BEGIN, /* Backup of MSG_TXN_BEGIN */ + MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */ + MSG_BKUP_TXN_BEGIN_GETGXID, /* Backup of MSG_TXN_BEGIN_GETGXID */ + MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */ + MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, /* Backup of MSG_TXN_BEGIN_GETGXID_MULTI */ + MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */ + MSG_BKUP_TXN_START_PREPARED, /* Backup of MSG_TXN_START_PREPARED */ MSG_TXN_COMMIT, /* Commit a running or prepared transaction */ - MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */ - MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */ + MSG_BKUP_TXN_COMMIT, /* Backup of MSG_TXN_COMMIT */ + MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */ + MSG_BKUP_TXN_COMMIT_MULTI, /* Bacukp of MSG_TXN_COMMIT_MULTI */ + MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */ + MSG_BKUP_TXN_COMMIT_PREPARED, /* Backup of MSG_TXN_COMMIT_PREPARED */ MSG_TXN_PREPARE, /* Finish preparing a transaction */ + MSG_BKUP_TXN_PREPARE, /* Backup of MSG_TXN_PREPARE */ MSG_TXN_ROLLBACK, /* Rollback a transaction */ - MSG_TXN_ROLLBACK_MULTI, /* Rollback multiple transactions */ + MSG_BKUP_TXN_ROLLBACK, /* Backup of MSG_TXN_ROLLBACK */ + MSG_TXN_ROLLBACK_MULTI, /* Rollback multiple transactions */ + MSG_BKUP_TXN_ROLLBACK_MULTI, /* Backup of MSG_TXN_ROLLBACK_MULTI */ MSG_TXN_GET_GID_DATA, /* Get info associated with a GID, and get a GXID */ MSG_TXN_GET_GXID, /* Get a GXID for a transaction */ - MSG_TXN_GET_NEXT_GXID, /* Get next GXID */ + MSG_BKUP_TXN_GET_GXID, + MSG_TXN_GET_NEXT_GXID, /* Get next GXID */ MSG_TXN_GXID_LIST, MSG_SNAPSHOT_GET, /* Get a global snapshot */ MSG_SNAPSHOT_GET_MULTI, /* Get multiple global snapshots */ MSG_SNAPSHOT_GXID_GET, /* Get GXID and snapshot together */ MSG_SEQUENCE_INIT, /* Initialize a new global sequence */ + MSG_BKUP_SEQUENCE_INIT, /* Backup of MSG_SEQUENCE_INIT */ MSG_SEQUENCE_GET_CURRENT,/* Get the current value of sequence */ - MSG_SEQUENCE_GET_NEXT, /* Get the next sequence value of sequence */ + MSG_SEQUENCE_GET_NEXT, /* Get the next sequence value of sequence */ + MSG_BKUP_SEQUENCE_GET_NEXT, /* Backup of MSG_SEQUENCE_GET_NEXT */ MSG_SEQUENCE_GET_LAST, /* Get the last sequence value of sequence */ - MSG_SEQUENCE_SET_VAL, /* Set values for sequence */ - MSG_SEQUENCE_RESET, /* Reset the sequence */ - MSG_SEQUENCE_CLOSE, /* Close a previously inited sequence */ - MSG_SEQUENCE_RENAME, /* Rename a sequence */ - MSG_SEQUENCE_ALTER, /* Alter a sequence */ + MSG_SEQUENCE_SET_VAL, /* Set values for sequence */ + MSG_BKUP_SEQUENCE_SET_VAL, /* Backup of MSG_SEQUENCE_SET_VAL */ + MSG_SEQUENCE_RESET, /* Reset the sequence */ + MSG_BKUP_SEQUENCE_RESET, /* Backup of MSG_SEQUENCE_RESET */ + MSG_SEQUENCE_CLOSE, /* Close a previously inited sequence */ + MSG_BKUP_SEQUENCE_CLOSE, /* Backup of MSG_SEQUENCE_CLOSE */ + MSG_SEQUENCE_RENAME, /* Rename a sequence */ + MSG_BKUP_SEQUENCE_RENAME, /* Backup of MSG_SEQUENCE_RENAME */ + MSG_SEQUENCE_ALTER, /* Alter a sequence */ + MSG_BKUP_SEQUENCE_ALTER, /* Backup of MSG_SEQUENCE_ALTER */ MSG_SEQUENCE_LIST, /* Get a list of sequences */ MSG_TXN_GET_STATUS, /* Get status of a given transaction */ MSG_TXN_GET_ALL_PREPARED, /* Get information about all outstanding * prepared transactions */ - MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Start a new transaction and get GXID for autovacuum */ + MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Start a new transaction and get GXID for autovacuum */ + MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Backup of MSG_TXN_BEGIN_GETGXID_AUTOVACUUM */ MSG_DATA_FLUSH, /* flush pending data */ MSG_BACKEND_DISCONNECT, /* tell GTM that the backend diconnected from the proxy */ @@ -64,8 +91,13 @@ typedef enum GTM_MessageType MSG_TYPE_COUNT /* A dummmy entry just to count the message types */ } GTM_MessageType; +/* + * Symbols in the following enum are usd in result_name_tab defined in gtm_utils.c. + * Modifictaion to the following enum should be reflected to result_name_tab as well. + */ typedef enum GTM_ResultType { + SYNC_STANDBY_RESULT, NODE_REGISTER_RESULT, NODE_UNREGISTER_RESULT, NODE_LIST_RESULT, @@ -103,6 +135,7 @@ typedef enum GTM_ResultType TXN_GET_STATUS_RESULT, TXN_GET_ALL_PREPARED_RESULT, TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, + RESULT_TYPE_COUNT } GTM_ResultType; /* diff --git a/src/include/gtm/gtm_opt.h b/src/include/gtm/gtm_opt.h index fd8a3101d8..b475d34e1d 100644 --- a/src/include/gtm/gtm_opt.h +++ b/src/include/gtm/gtm_opt.h @@ -351,6 +351,7 @@ const char *const config_type_names[] =\ #define GTM_OPTNAME_PORT "port" #define GTM_OPTNAME_STARTUP "startup" #define GTM_OPTNAME_STATUS_READER "status_reader" +#define GTM_OPTNAME_SYNCHRONOUS_BACKUP "synchronous_backup" #define GTM_OPTNAME_WORKER_THREADS "worker_threads" diff --git a/src/include/gtm/gtm_seq.h b/src/include/gtm/gtm_seq.h index 92174eecc5..77478eab47 100644 --- a/src/include/gtm/gtm_seq.h +++ b/src/include/gtm/gtm_seq.h @@ -76,14 +76,14 @@ int GTM_SeqSetVal(GTM_SequenceKey seqkey, GTM_Sequence nextval, bool iscalled); int GTM_SeqReset(GTM_SequenceKey seqkey); -void ProcessSequenceInitCommand(Port *myport, StringInfo message); +void ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup); void ProcessSequenceGetCurrentCommand(Port *myport, StringInfo message); -void ProcessSequenceGetNextCommand(Port *myport, StringInfo message); -void ProcessSequenceSetValCommand(Port *myport, StringInfo message); -void ProcessSequenceResetCommand(Port *myport, StringInfo message); -void ProcessSequenceCloseCommand(Port *myport, StringInfo message); -void ProcessSequenceRenameCommand(Port *myport, StringInfo message); -void ProcessSequenceAlterCommand(Port *myport, StringInfo message); +void ProcessSequenceGetNextCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessSequenceSetValCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessSequenceResetCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessSequenceAlterCommand(Port *myport, StringInfo message, bool is_backup); void ProcessSequenceListCommand(Port *myport, StringInfo message); diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h index 6cd5181a96..dae4cc143f 100644 --- a/src/include/gtm/gtm_txn.h +++ b/src/include/gtm/gtm_txn.h @@ -220,25 +220,42 @@ GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], void GTM_FreeCachedTransInfo(void); void ProcessBeginTransactionCommand(Port *myport, StringInfo message); +void ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message); +void GTM_BkupBeginTransactionMulti(char *coord_name, + GTM_TransactionHandle *txn, + GTM_IsolationLevel *isolevel, + bool *readonly, + GTMProxy_ConnID *connid, + int txn_count); + void ProcessBeginTransactionCommandMulti(Port *myport, StringInfo message); void ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message); -void ProcessCommitTransactionCommand(Port *myport, StringInfo message); -void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message); -void ProcessRollbackTransactionCommand(Port *myport, StringInfo message); -void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message); -void ProcessPrepareTransactionCommand(Port *myport, StringInfo message); +void ProcessCommitTransactionCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessRollbackTransactionCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup); +void ProcessPrepareTransactionCommand(Port *myport, StringInfo message, bool is_backup); void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message); void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message); void ProcessGXIDListCommand(Port *myport, StringInfo message); void ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message); void ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message); +void ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message); + void ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message); -void ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message); -void ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message) ; +void ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup); +void ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup) ; void GTM_SaveTxnInfo(int ctlfd); void GTM_RestoreTxnInfo(int ctlfd, GlobalTransactionId next_gxid); +void GTM_BkupBeginTransaction(char *coord_name, + GTM_TransactionHandle txn, + GTM_IsolationLevel isolevel, + bool readonly); +void ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message); +void ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message); + /* * In gtm_snap.c diff --git a/src/include/gtm/gtm_utils.h b/src/include/gtm/gtm_utils.h index 245daa9114..bea9fb38fa 100644 --- a/src/include/gtm/gtm_utils.h +++ b/src/include/gtm/gtm_utils.h @@ -15,6 +15,7 @@ #define GTM_UTILS_H #include "gtm/libpq-int.h" +#include "gtm/gtm_msg.h" #if 0 /* @@ -25,4 +26,8 @@ void gtm_report_failure(GTM_Conn *); #endif +void gtm_util_init_nametabs(void); +char *gtm_util_message_name(GTM_MessageType type); +char *gtm_util_result_name(GTM_ResultType type); + #endif /* GTM_UTIL_H */ diff --git a/src/include/gtm/register.h b/src/include/gtm/register.h index 15674ec59a..e8aa8c1bf0 100644 --- a/src/include/gtm/register.h +++ b/src/include/gtm/register.h @@ -79,8 +79,8 @@ void Recovery_SaveRegisterInfo(void); void Recovery_PGXCNodeDisconnect(Port *myport); void Recovery_SaveRegisterFileName(char *dir); -void ProcessPGXCNodeRegister(Port *myport, StringInfo message); -void ProcessPGXCNodeUnregister(Port *myport, StringInfo message); +void ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup); +void ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup); void ProcessPGXCNodeBackendDisconnect(Port *myport, StringInfo message); void ProcessPGXCNodeList(Port *myport, StringInfo message); |