summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc-xc/src/sgml/ref/gtm.sgmlin21
-rw-r--r--src/gtm/client/fe-protocol.c3
-rw-r--r--src/gtm/client/gtm_client.c858
-rw-r--r--src/gtm/client/pqexpbuffer.c2
-rw-r--r--src/gtm/common/elog.c9
-rw-r--r--src/gtm/common/gtm_utils.c181
-rw-r--r--src/gtm/libpq/pqcomm.c6
-rw-r--r--src/gtm/main/gtm.conf.sample1
-rw-r--r--src/gtm/main/gtm_opt.c22
-rw-r--r--src/gtm/main/gtm_seq.c543
-rw-r--r--src/gtm/main/gtm_snap.c5
-rw-r--r--src/gtm/main/gtm_standby.c12
-rw-r--r--src/gtm/main/gtm_txn.c916
-rw-r--r--src/gtm/main/main.c202
-rw-r--r--src/gtm/proxy/gtm_proxy_opt.c14
-rw-r--r--src/gtm/proxy/proxy_main.c54
-rw-r--r--src/gtm/recovery/register_gtm.c206
-rw-r--r--src/gtm/recovery/replication.c8
-rw-r--r--src/include/gtm/gtm_client.h59
-rw-r--r--src/include/gtm/gtm_msg.h73
-rw-r--r--src/include/gtm/gtm_opt.h1
-rw-r--r--src/include/gtm/gtm_seq.h14
-rw-r--r--src/include/gtm/gtm_txn.h31
-rw-r--r--src/include/gtm/gtm_utils.h5
-rw-r--r--src/include/gtm/register.h4
25 files changed, 2404 insertions, 846 deletions
diff --git a/doc-xc/src/sgml/ref/gtm.sgmlin b/doc-xc/src/sgml/ref/gtm.sgmlin
index c27703ef81..28e16b5975 100644
--- a/doc-xc/src/sgml/ref/gtm.sgmlin
+++ b/doc-xc/src/sgml/ref/gtm.sgmlin
@@ -259,6 +259,27 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry id="gtm-opt-synchronous-backup" xreflabel="gtm_opt_synchronous_backup">
+ <term><varname>synchronous-backup</varname> (<type>boolean</type>)</term>
+ <indexterm>
+ <primary><varname>synchronous-backup</varname> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Specifies if backup to GTM-Standby is taken synchronously. If this is turned on,
+ GTM will send and receive synchronize message to make sure that all the backups
+ reached to the standby.
+ </para>
+ <para>
+ If it is turned off, all the backup information will be sent without checking they
+ reached to GTM-Standby.
+ </para>
+ <para>
+ Default value is off.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c
index 1fff790c53..bbc234ea9d 100644
--- a/src/gtm/client/fe-protocol.c
+++ b/src/gtm/client/fe-protocol.c
@@ -351,6 +351,9 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
switch (result->gr_type)
{
+ case SYNC_STANDBY_RESULT:
+ break;
+
case NODE_BEGIN_REPLICATION_INIT_RESULT:
break;
diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c
index 2cc5b4dd9c..521df48112 100644
--- a/src/gtm/client/gtm_client.c
+++ b/src/gtm/client/gtm_client.c
@@ -14,7 +14,13 @@
*/
/* Time in seconds to wait for a response from GTM */
/* We should consider making this a GUC */
+#ifndef CLIENT_GTM_TIMEOUT
+#ifdef GTM_DEBUG
+#define CLIENT_GTM_TIMEOUT 3600
+#else
#define CLIENT_GTM_TIMEOUT 20
+#endif
+#endif
#include <time.h>
@@ -31,10 +37,33 @@
#include "gtm/register.h"
#include "gtm/assert.h"
+extern bool Backup_synchronously;
+
void GTM_FreeResult(GTM_Result *result, GTM_PGXCNodeType remote_type);
static GTM_Result *makeEmptyResultIfIsNull(GTM_Result *oldres);
-
+static int commit_prepared_transaction_internal(GTM_Conn *conn,
+ GlobalTransactionId gxid, GlobalTransactionId prepared_gxid,
+ bool is_backup);
+static int prepare_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup);
+static int abort_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup);
+static int abort_transaction_multi_internal(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
+ int *txn_count_out, int *status_out, bool is_backup);
+static int open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, bool cycle, bool is_backup);
+static GTM_Sequence get_next_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup);
+static int set_val_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled, bool is_backup);
+static int reset_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup);
+static int commit_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup);
+static int close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup);
+static int rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup);
+static int alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup);
+static int node_register_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port,
+ char *node_name, char *datafolder, GTM_PGXCNodeStatus status, bool is_backup);
+static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup);
/*
* Make an empty result if old one is null.
*/
@@ -371,6 +400,63 @@ send_failed:
/*
* Transaction Management API
*/
+
+int
+bkup_begin_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, GTM_IsolationLevel isolevel,
+ bool read_only, GTM_Timestamp timestamp)
+{
+ /* Start the message. */
+ if (gtmpqPutMsgStart('C', true, conn) ||
+ gtmpqPutInt(MSG_BKUP_TXN_BEGIN, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
+ gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) ||
+ gtmpqPutc(read_only, conn) ||
+ gtmpqPutnchar((char *)&timestamp, sizeof(GTM_Timestamp), conn))
+ goto send_failed;
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ return 0;
+
+send_failed:
+ return -1;
+
+}
+
+int
+bkup_begin_transaction_gxid(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+ GTM_IsolationLevel isolevel, bool read_only, GTM_Timestamp timestamp)
+{
+ /* Start the message. */
+ if (gtmpqPutMsgStart('C', true, conn) ||
+ gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
+ gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
+ gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn) ||
+ gtmpqPutc(read_only, conn) ||
+ gtmpqPutnchar((char *)&timestamp, sizeof(GTM_Timestamp), conn))
+ goto send_failed;
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ return 0;
+
+send_failed:
+ return -1;
+}
+
GlobalTransactionId
begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp)
{
@@ -418,6 +504,32 @@ send_failed:
return InvalidGlobalTransactionId;
}
+
+int
+bkup_begin_transaction_autovacuum(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+ GTM_IsolationLevel isolevel)
+{
+ /* Start the message. */
+ if (gtmpqPutMsgStart('C', true, conn) ||
+ gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
+ gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
+ gtmpqPutInt(isolevel, sizeof (GTM_IsolationLevel), conn))
+ goto send_failed;
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ return 0;
+
+send_failed:
+ return -1;
+}
/*
* Transaction Management API
* Begin a transaction for an autovacuum worker process
@@ -465,14 +577,28 @@ send_failed:
}
int
+bkup_commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
+{
+ return commit_transaction_internal(conn, gxid, true);
+}
+
+
+int
commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
{
+ return commit_transaction_internal(conn, gxid, false);
+}
+
+
+static int
+commit_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_TXN_COMMIT, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_TXN_COMMIT : MSG_TXN_COMMIT, sizeof (GTM_MessageType), conn) ||
gtmpqPutc(true, conn) ||
gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn))
goto send_failed;
@@ -485,21 +611,25 @@ commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_type == TXN_COMMIT_RESULT);
- Assert(res->gr_resdata.grd_gxid == gxid);
- }
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_type == TXN_COMMIT_RESULT);
+ Assert(res->gr_resdata.grd_gxid == gxid);
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -511,12 +641,24 @@ send_failed:
int
commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid)
{
+ return commit_prepared_transaction_internal(conn, gxid, prepared_gxid, false);
+}
+
+int
+bkup_commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid)
+{
+ return commit_prepared_transaction_internal(conn, gxid, prepared_gxid, true);
+}
+
+static int
+commit_prepared_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_TXN_COMMIT_PREPARED, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_TXN_COMMIT_PREPARED : MSG_TXN_COMMIT_PREPARED, sizeof (GTM_MessageType), conn) ||
gtmpqPutc(true, conn) ||
gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn) ||
gtmpqPutc(true, conn) ||
@@ -531,21 +673,25 @@ commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTran
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_type == TXN_COMMIT_PREPARED_RESULT);
- Assert(res->gr_resdata.grd_gxid == gxid);
- }
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_type == TXN_COMMIT_PREPARED_RESULT);
+ Assert(res->gr_resdata.grd_gxid == gxid);
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
send_failed:
receive_failed:
@@ -557,12 +703,24 @@ receive_failed:
int
abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
{
+ return abort_transaction_internal(conn, gxid, false);
+}
+
+int
+bkup_abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
+{
+ return abort_transaction_internal(conn, gxid, true);
+}
+
+static int
+abort_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_TXN_ROLLBACK, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_TXN_ROLLBACK : MSG_TXN_ROLLBACK, sizeof (GTM_MessageType), conn) ||
gtmpqPutc(true, conn) ||
gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn))
goto send_failed;
@@ -575,21 +733,25 @@ abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_type == TXN_ROLLBACK_RESULT);
- Assert(res->gr_resdata.grd_gxid == gxid);
- }
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_type == TXN_ROLLBACK_RESULT);
+ Assert(res->gr_resdata.grd_gxid == gxid);
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -600,6 +762,31 @@ send_failed:
}
int
+backup_start_prepared_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, char *gid,
+ char *nodestring)
+{
+ Assert(nodestring && gid && conn);
+
+ if (gtmpqPutMsgStart('C', true, conn) ||
+ gtmpqPutInt(MSG_BKUP_TXN_START_PREPARED, sizeof(GTM_MessageType), conn) ||
+ gtmpqPutc(false, conn) ||
+ gtmpqPutInt(txn, sizeof(GTM_TransactionHandle), conn) ||
+ gtmpqPutInt(strlen(gid), sizeof(GTM_StrLen), conn) ||
+ gtmpqPutnchar(gid, strlen(gid), conn) ||
+ gtmpqPutInt(strlen(nodestring), sizeof(GTM_StrLen), conn) ||
+ gtmpqPutnchar(nodestring, strlen(nodestring), conn))
+ goto send_failed;
+
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ return GTM_RESULT_OK;
+
+send_failed:
+ return -1;
+}
+
+int
start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
char *nodestring)
{
@@ -652,16 +839,27 @@ send_failed:
return -1;
}
-
int
prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
{
+ return prepare_transaction_internal(conn, gxid, false);
+}
+
+int
+bkup_prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
+{
+ return prepare_transaction_internal(conn, gxid, true);
+}
+
+static int
+prepare_transaction_internal(GTM_Conn *conn, GlobalTransactionId gxid, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_TXN_PREPARE, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_TXN_PREPARE : MSG_TXN_PREPARE, sizeof (GTM_MessageType), conn) ||
gtmpqPutc(true, conn) ||
gtmpqPutnchar((char *)&gxid, sizeof (GlobalTransactionId), conn))
goto send_failed;
@@ -674,21 +872,25 @@ prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_type == TXN_PREPARE_RESULT);
- Assert(res->gr_resdata.grd_gxid == gxid);
- }
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_type == TXN_PREPARE_RESULT);
+ Assert(res->gr_resdata.grd_gxid == gxid);
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -809,12 +1011,28 @@ open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
GTM_Sequence minval, GTM_Sequence maxval,
GTM_Sequence startval, bool cycle)
{
+ return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, false);
+}
+
+int
+bkup_open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, bool cycle)
+{
+ return open_sequence_internal(conn, key, increment, minval, maxval, startval, cycle, true);
+}
+
+static int
+open_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, bool cycle, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_INIT, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_INIT : MSG_SEQUENCE_INIT, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) ||
gtmpqPutnchar((char *)&increment, sizeof (GTM_Sequence), conn) ||
@@ -824,23 +1042,27 @@ open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
gtmpqPutc(cycle, conn))
goto send_failed;
- /* Finish the message. */
- if (gtmpqPutMsgEnd(conn))
- goto send_failed;
+ if (!is_backup)
+ {
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
- /* Flush to ensure backend gets it. */
- if (gtmpqFlush(conn))
- goto send_failed;
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -854,12 +1076,28 @@ alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
GTM_Sequence minval, GTM_Sequence maxval,
GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart)
{
+ return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, false);
+}
+
+int
+bkup_alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart)
+{
+ return alter_sequence_internal(conn, key, increment, minval, maxval, startval, lastval, cycle, is_restart, true);
+}
+
+static int
+alter_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_ALTER, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_ALTER : MSG_SEQUENCE_ALTER, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) ||
gtmpqPutnchar((char *)&increment, sizeof (GTM_Sequence), conn) ||
@@ -879,15 +1117,19 @@ alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -899,12 +1141,24 @@ send_failed:
int
close_sequence(GTM_Conn *conn, GTM_SequenceKey key)
{
+ return close_sequence_internal(conn, key, false);
+}
+
+int
+bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key)
+{
+ return close_sequence_internal(conn, key, true);
+}
+
+static int
+close_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_CLOSE : MSG_SEQUENCE_CLOSE, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) ||
gtmpqPutnchar((char *)&key->gsk_type, sizeof(GTM_SequenceKeyType), conn))
@@ -918,15 +1172,19 @@ close_sequence(GTM_Conn *conn, GTM_SequenceKey key)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -938,12 +1196,24 @@ send_failed:
int
rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey)
{
+ return rename_sequence_internal(conn, key, newkey, false);
+}
+
+int
+bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey)
+{
+ return rename_sequence_internal(conn, key, newkey, true);
+}
+
+static int
+rename_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_RENAME, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_RENAME : MSG_SEQUENCE_RENAME, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn)||
gtmpqPutInt(newkey->gsk_keylen, 4, conn) ||
@@ -958,15 +1228,19 @@ rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1019,12 +1293,24 @@ send_failed:
int
set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled)
{
+ return set_val_internal(conn, key, nextval, iscalled, false);
+}
+
+int
+bkup_set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled)
+{
+ return set_val_internal(conn, key, nextval, iscalled, true);
+}
+
+static int
+set_val_internal(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_SET_VAL, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_SET_VAL : MSG_SEQUENCE_SET_VAL, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn) ||
gtmpqPutnchar((char *)&nextval, sizeof (GTM_Sequence), conn) ||
@@ -1039,15 +1325,19 @@ set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool iscalled
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1059,12 +1349,24 @@ send_failed:
GTM_Sequence
get_next(GTM_Conn *conn, GTM_SequenceKey key)
{
+ return get_next_internal(conn, key, false);
+}
+
+GTM_Sequence
+bkup_get_next(GTM_Conn *conn, GTM_SequenceKey key)
+{
+ return get_next_internal(conn, key, true);
+}
+
+static GTM_Sequence
+get_next_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_GET_NEXT, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_GET_NEXT : MSG_SEQUENCE_GET_NEXT, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn))
goto send_failed;
@@ -1077,18 +1379,22 @@ get_next(GTM_Conn *conn, GTM_SequenceKey key)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
-
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
-
- if (res->gr_status == GTM_RESULT_OK)
- return res->gr_resdata.grd_seq.seqval;
- else
- return InvalidSequenceValue;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
+
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
+
+ if (res->gr_status == GTM_RESULT_OK)
+ return res->gr_resdata.grd_seq.seqval;
+ else
+ return InvalidSequenceValue;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1100,12 +1406,24 @@ send_failed:
int
reset_sequence(GTM_Conn *conn, GTM_SequenceKey key)
{
+ return reset_sequence_internal(conn, key, false);
+}
+
+int
+bkup_reset_sequence(GTM_Conn *conn, GTM_SequenceKey key)
+{
+ return reset_sequence_internal(conn, key, true);
+}
+
+static int
+reset_sequence_internal(GTM_Conn *conn, GTM_SequenceKey key, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
/* Start the message. */
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_SEQUENCE_RESET, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_SEQUENCE_RESET : MSG_SEQUENCE_RESET, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(key->gsk_keylen, 4, conn) ||
gtmpqPutnchar(key->gsk_key, key->gsk_keylen, conn))
goto send_failed;
@@ -1118,15 +1436,19 @@ reset_sequence(GTM_Conn *conn, GTM_SequenceKey key)
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1177,6 +1499,8 @@ node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc)
* This number is modified at proxy level automatically.
*
* node_register() returns 0 on success, -1 on failure.
+ *
+ * is_backup indicates the message should be *_BKUP_* message
*/
int node_register(GTM_Conn *conn,
GTM_PGXCNodeType type,
@@ -1193,7 +1517,7 @@ int node_register(GTM_Conn *conn,
return -1;
}
- return node_register_internal(conn, type, host, port, node_name, datafolder, NODE_CONNECTED);
+ return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, false);
}
int node_register_internal(GTM_Conn *conn,
@@ -1204,6 +1528,47 @@ int node_register_internal(GTM_Conn *conn,
char *datafolder,
GTM_PGXCNodeStatus status)
{
+ return node_register_worker(conn, type, host, port, node_name, datafolder, status, false);
+}
+
+int bkup_node_register(GTM_Conn *conn,
+ GTM_PGXCNodeType type,
+ GTM_PGXCNodePort port,
+ char *node_name,
+ char *datafolder)
+{
+ char host[1024];
+ int rc;
+
+ node_get_local_addr(conn, host, sizeof(host), &rc);
+ if (rc != 0)
+ {
+ return -1;
+ }
+
+ return node_register_worker(conn, type, host, port, node_name, datafolder, NODE_CONNECTED, true);
+}
+
+int bkup_node_register_internal(GTM_Conn *conn,
+ GTM_PGXCNodeType type,
+ const char *host,
+ GTM_PGXCNodePort port,
+ char *node_name,
+ char *datafolder,
+ GTM_PGXCNodeStatus status)
+{
+ return node_register_worker(conn, type, host, port, node_name, datafolder, status, true);
+}
+
+static int node_register_worker(GTM_Conn *conn,
+ GTM_PGXCNodeType type,
+ const char *host,
+ GTM_PGXCNodePort port,
+ char *node_name,
+ char *datafolder,
+ GTM_PGXCNodeStatus status,
+ bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
char proxy_name[] = "";
@@ -1218,7 +1583,7 @@ int node_register_internal(GTM_Conn *conn,
*/
if (gtmpqPutMsgStart('C', true, conn) ||
/* Message Type */
- gtmpqPutInt(MSG_NODE_REGISTER, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup? MSG_BKUP_NODE_REGISTER : MSG_NODE_REGISTER, sizeof (GTM_MessageType), conn) ||
/* Node Type to Register */
gtmpqPutnchar((char *)&type, sizeof(GTM_PGXCNodeType), conn) ||
/* Node name length */
@@ -1258,26 +1623,30 @@ int node_register_internal(GTM_Conn *conn,
goto send_failed;
}
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
+ if (!is_backup)
{
- goto receive_failed;
- }
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ {
+ goto receive_failed;
+ }
- if ((res = GTMPQgetResult(conn)) == NULL)
- {
- goto receive_failed;
- }
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ {
+ goto receive_failed;
+ }
- /* Check on node type and node name */
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_resdata.grd_node.type == type);
- Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0));
- }
+ /* Check on node type and node name */
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_resdata.grd_node.type == type);
+ Assert((strcmp(res->gr_resdata.grd_node.node_name,node_name) == 0));
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1288,11 +1657,21 @@ send_failed:
int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name)
{
+ return node_unregister_worker(conn, type, node_name, false);
+}
+
+int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name)
+{
+ return node_unregister_worker(conn, type, node_name, true);
+}
+
+static int node_unregister_worker(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name, bool is_backup)
+{
GTM_Result *res = NULL;
time_t finish_time;
if (gtmpqPutMsgStart('C', true, conn) ||
- gtmpqPutInt(MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(is_backup ? MSG_BKUP_NODE_UNREGISTER : MSG_NODE_UNREGISTER, sizeof (GTM_MessageType), conn) ||
gtmpqPutnchar((char *)&type, sizeof(GTM_PGXCNodeType), conn) ||
/* Node name length */
gtmpqPutInt(strlen(node_name), sizeof (GTM_StrLen), conn) ||
@@ -1308,22 +1687,26 @@ int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_nam
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- /* Check on node type and node name */
- if (res->gr_status == GTM_RESULT_OK)
- {
- Assert(res->gr_resdata.grd_node.type == type);
- Assert( (strcmp(res->gr_resdata.grd_node.node_name, node_name) == 0) );
- }
+ /* Check on node type and node name */
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ Assert(res->gr_resdata.grd_node.type == type);
+ Assert( (strcmp(res->gr_resdata.grd_node.node_name, node_name) == 0) );
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1332,6 +1715,7 @@ send_failed:
return -1;
}
+
void
GTM_FreeResult(GTM_Result *result, GTM_PGXCNodeType remote_type)
{
@@ -1432,6 +1816,85 @@ send_failed:
return -1;
}
+
+int
+bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count,
+ GTM_TransactionHandle *txn, GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel,
+ bool *read_only, GTMProxy_ConnID *txn_connid)
+{
+ int ii;
+ GlobalTransactionId gxid = start_gxid;
+
+ /* Start the message. */
+ if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */
+ goto send_failed;
+
+ if (gtmpqPutInt(MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(txn_count, sizeof(int), conn))
+ goto send_failed;
+
+ for (ii = 0; ii < txn_count; ii++, gxid++)
+ {
+ if (gxid == InvalidGlobalTransactionId)
+ gxid = FirstNormalGlobalTransactionId;
+ if (gtmpqPutInt(txn[ii], sizeof(GTM_TransactionHandle), conn) ||
+ gtmpqPutInt(gxid, sizeof(GlobalTransactionId), conn) ||
+ gtmpqPutInt(isolevel[ii], sizeof(int), conn) ||
+ gtmpqPutc(read_only[ii], conn) ||
+ gtmpqPutInt(txn_connid[ii], sizeof(int), conn))
+ goto send_failed;
+ }
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ return 0;
+
+send_failed:
+ return -1;
+
+}
+
+int
+bkup_commit_transaction_multi(GTM_Conn *conn, int txn_count, GTM_TransactionHandle *txn)
+{
+ int ii;
+
+ if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */
+ goto send_failed;
+
+ if (gtmpqPutInt(MSG_BKUP_TXN_COMMIT_MULTI, sizeof (GTM_MessageType), conn) ||
+ gtmpqPutInt(txn_count, sizeof(int), conn))
+ goto send_failed;
+
+ for (ii = 0; ii < txn_count; ii++)
+ {
+ if (gtmpqPutc(false, conn) ||
+ gtmpqPutnchar((char *)&txn[ii],
+ sizeof (GTM_TransactionHandle), conn))
+ goto send_failed;
+ }
+
+ /* Finish the message. */
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ /* Flush to ensure backend gets it. */
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ return GTM_RESULT_OK;
+
+send_failed:
+ return -1;
+}
+
+
int
commit_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
int *txn_count_out, int *status_out)
@@ -1489,7 +1952,23 @@ send_failed:
int
abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
- int *txn_count_out, int *status_out)
+ int *txn_count_out, int *status_out)
+{
+ return abort_transaction_multi_internal(conn, txn_count, gxid, txn_count_out, status_out, false);
+}
+
+int
+bkup_abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid)
+{
+ int txn_count_out;
+ int status_out[GTM_MAX_GLOBAL_TRANSACTIONS];
+
+ return abort_transaction_multi_internal(conn, txn_count, gxid, &txn_count_out, status_out, true);
+}
+
+static int
+abort_transaction_multi_internal(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
+ int *txn_count_out, int *status_out, bool is_backup)
{
GTM_Result *res = NULL;
time_t finish_time;
@@ -1499,7 +1978,7 @@ abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid
if (gtmpqPutMsgStart('C', true, conn)) /* FIXME: no proxy header */
goto send_failed;
- if (gtmpqPutInt(MSG_TXN_ROLLBACK_MULTI, sizeof (GTM_MessageType), conn) ||
+ if (gtmpqPutInt(is_backup ? MSG_BKUP_TXN_ROLLBACK_MULTI : MSG_TXN_ROLLBACK_MULTI, sizeof (GTM_MessageType), conn) ||
gtmpqPutInt(txn_count, sizeof(int), conn))
goto send_failed;
@@ -1519,21 +1998,25 @@ abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid
if (gtmpqFlush(conn))
goto send_failed;
- finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
- if (gtmpqWaitTimed(true, false, conn, finish_time) ||
- gtmpqReadData(conn) < 0)
- goto receive_failed;
+ if (!is_backup)
+ {
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
- if ((res = GTMPQgetResult(conn)) == NULL)
- goto receive_failed;
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
- if (res->gr_status == GTM_RESULT_OK)
- {
- memcpy(txn_count_out, &res->gr_resdata.grd_txn_get_multi.txn_count, sizeof(int));
- memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out));
- }
+ if (res->gr_status == GTM_RESULT_OK)
+ {
+ memcpy(txn_count_out, &res->gr_resdata.grd_txn_get_multi.txn_count, sizeof(int));
+ memcpy(status_out, &res->gr_resdata.grd_txn_rc_multi.status, sizeof(int) * (*txn_count_out));
+ }
- return res->gr_status;
+ return res->gr_status;
+ }
+ return GTM_RESULT_OK;
receive_failed:
send_failed:
@@ -1638,4 +2121,43 @@ send_failed:
conn->result->gr_status = GTM_RESULT_COMM_ERROR;
return -1;
}
-
+
+/*
+ * Sync with standby
+ */
+int
+gtm_sync_standby(GTM_Conn *conn)
+{
+ GTM_Result *res = NULL;
+ time_t finish_time;
+
+ elog(DEBUG3, "Synchronizing with standby");
+
+ if (gtmpqPutMsgStart('C', true, conn))
+ goto send_failed;
+
+ if (gtmpqPutInt(MSG_SYNC_STANDBY, sizeof(GTM_MessageType), conn))
+ goto send_failed;
+
+ if (gtmpqPutMsgEnd(conn))
+ goto send_failed;
+
+ if (gtmpqFlush(conn))
+ goto send_failed;
+
+ finish_time = time(NULL) + CLIENT_GTM_TIMEOUT;
+ if (gtmpqWaitTimed(true, false, conn, finish_time) ||
+ gtmpqReadData(conn) < 0)
+ goto receive_failed;
+
+ if ((res = GTMPQgetResult(conn)) == NULL)
+ goto receive_failed;
+
+ return res->gr_status;
+
+receive_failed:
+send_failed:
+ conn->result = makeEmptyResultIfIsNull(conn->result);
+ conn->result->gr_status = GTM_RESULT_COMM_ERROR;
+ return -1;
+}
diff --git a/src/gtm/client/pqexpbuffer.c b/src/gtm/client/pqexpbuffer.c
index 2b852d6580..856b765509 100644
--- a/src/gtm/client/pqexpbuffer.c
+++ b/src/gtm/client/pqexpbuffer.c
@@ -139,7 +139,7 @@ resetGTMPQExpBuffer(PQExpBuffer str)
{
if (str)
{
- if (str->data != oom_buffer)
+ if (str->data && str->data != oom_buffer)
{
str->len = 0;
str->data[0] = '\0';
diff --git a/src/gtm/common/elog.c b/src/gtm/common/elog.c
index 9098b0ade1..db47481d1a 100644
--- a/src/gtm/common/elog.c
+++ b/src/gtm/common/elog.c
@@ -853,6 +853,15 @@ send_message_to_server_log(ErrorData *edata)
/*
* Write error report to client
+ *
+ * At present, this function is not used within GTM. Because this flushes
+ * message back to the client, GTM should consider to flush backup to the
+ * standby. However, we cannot simply refer to isGTM because this module
+ * can be included in coordinator backends. If this can really be called
+ * from any GTM module, we need a solution to determine that the Port is
+ * in GTM or not, without direct reference to isGTM.
+ *
+ * K.Suzuki, Jan, 2012
*/
static void
send_message_to_frontend(Port *myport, ErrorData *edata)
diff --git a/src/gtm/common/gtm_utils.c b/src/gtm/common/gtm_utils.c
index bce9bcb8a2..b1293c2770 100644
--- a/src/gtm/common/gtm_utils.c
+++ b/src/gtm/common/gtm_utils.c
@@ -16,6 +16,186 @@
#include "gtm/gtm_utils.h"
#include "gtm/elog.h"
#include "gtm/gtm.h"
+#include "gtm/gtm_msg.h"
+
+struct enum_name
+{
+ int type;
+ char *name;
+};
+
+/*
+ * Advise:
+ * Following table can be formatted using gtm_msg.h definitions.
+ */
+static struct enum_name message_name_tab[] =
+{
+ {MSG_TYPE_INVALID, "MSG_TYPE_INVALID"},
+ {MSG_SYNC_STANDBY, "MSG_SYNC_STANDBY"},
+ {MSG_NODE_REGISTER, "MSG_NODE_REGISTER"},
+ {MSG_BKUP_NODE_REGISTER, "MSG_BKUP_NODE_REGISTER"},
+ {MSG_NODE_UNREGISTER, "MSG_NODE_UNREGISTER"},
+ {MSG_BKUP_NODE_UNREGISTER, "MSG_BKUP_NODE_UNREGISTER"},
+ {MSG_NODE_LIST, "MSG_NODE_LIST"},
+ {MSG_NODE_BEGIN_REPLICATION_INIT, "MSG_NODE_BEGIN_REPLICATION_INIT"},
+ {MSG_NODE_END_REPLICATION_INIT, "MSG_NODE_END_REPLICATION_INIT"},
+ {MSG_BEGIN_BACKUP, "MSG_BEGIN_BACKUP"},
+ {MSG_END_BACKUP, "MSG_END_BACKUP"},
+ {MSG_TXN_BEGIN, "MSG_TXN_BEGIN"},
+ {MSG_BKUP_TXN_BEGIN, "MSG_BKUP_TXN_BEGIN"},
+ {MSG_TXN_BEGIN_GETGXID, "MSG_TXN_BEGIN_GETGXID"},
+ {MSG_BKUP_TXN_BEGIN_GETGXID, "MSG_BKUP_TXN_BEGIN_GETGXID"},
+ {MSG_TXN_BEGIN_GETGXID_MULTI, "MSG_TXN_BEGIN_GETGXID_MULTI"},
+ {MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, "MSG_BKUP_TXN_BEGIN_GETGXID_MULTI"},
+ {MSG_TXN_START_PREPARED, "MSG_TXN_START_PREPARED"},
+ {MSG_BKUP_TXN_START_PREPARED, "MSG_BKUP_TXN_START_PREPARED"},
+ {MSG_TXN_COMMIT, "MSG_TXN_COMMIT"},
+ {MSG_BKUP_TXN_COMMIT, "MSG_BKUP_TXN_COMMIT"},
+ {MSG_TXN_COMMIT_MULTI, "MSG_TXN_COMMIT_MULTI"},
+ {MSG_BKUP_TXN_COMMIT_MULTI, "MSG_BKUP_TXN_COMMIT_MULTI"},
+ {MSG_TXN_COMMIT_PREPARED, "MSG_TXN_COMMIT_PREPARED"},
+ {MSG_BKUP_TXN_COMMIT_PREPARED, "MSG_BKUP_TXN_COMMIT_PREPARED"},
+ {MSG_TXN_PREPARE, "MSG_TXN_PREPARE"},
+ {MSG_BKUP_TXN_PREPARE, "MSG_BKUP_TXN_PREPARE"},
+ {MSG_TXN_ROLLBACK, "MSG_TXN_ROLLBACK"},
+ {MSG_BKUP_TXN_ROLLBACK, "MSG_BKUP_TXN_ROLLBACK"},
+ {MSG_TXN_ROLLBACK_MULTI, "MSG_TXN_ROLLBACK_MULTI"},
+ {MSG_BKUP_TXN_ROLLBACK_MULTI, "MSG_BKUP_TXN_ROLLBACK_MULTI"},
+ {MSG_TXN_GET_GID_DATA, "MSG_TXN_GET_GID_DATA"},
+ {MSG_TXN_GET_GXID, "MSG_TXN_GET_GXID"},
+ {MSG_BKUP_TXN_GET_GXID, "MSG_BKUP_TXN_GET_GXID"},
+ {MSG_TXN_GET_NEXT_GXID, "MSG_TXN_GET_NEXT_GXID"},
+ {MSG_TXN_GXID_LIST, "MSG_TXN_GXID_LIST"},
+ {MSG_SNAPSHOT_GET, "MSG_SNAPSHOT_GET"},
+ {MSG_SNAPSHOT_GET_MULTI, "MSG_SNAPSHOT_GET_MULTI"},
+ {MSG_SNAPSHOT_GXID_GET, "MSG_SNAPSHOT_GXID_GET"},
+ {MSG_SEQUENCE_INIT, "MSG_SEQUENCE_INIT"},
+ {MSG_BKUP_SEQUENCE_INIT, "MSG_BKUP_SEQUENCE_INIT"},
+ {MSG_SEQUENCE_GET_CURRENT, "MSG_SEQUENCE_GET_CURRENT"},
+ {MSG_SEQUENCE_GET_NEXT, "MSG_SEQUENCE_GET_NEXT"},
+ {MSG_BKUP_SEQUENCE_GET_NEXT, "MSG_BKUP_SEQUENCE_GET_NEXT"},
+ {MSG_SEQUENCE_GET_LAST, "MSG_SEQUENCE_GET_LAST"},
+ {MSG_SEQUENCE_SET_VAL, "MSG_SEQUENCE_SET_VAL"},
+ {MSG_BKUP_SEQUENCE_SET_VAL, "MSG_BKUP_SEQUENCE_SET_VAL"},
+ {MSG_SEQUENCE_RESET, "MSG_SEQUENCE_RESET"},
+ {MSG_BKUP_SEQUENCE_RESET, "MSG_BKUP_SEQUENCE_RESET"},
+ {MSG_SEQUENCE_CLOSE, "MSG_SEQUENCE_CLOSE"},
+ {MSG_BKUP_SEQUENCE_CLOSE, "MSG_BKUP_SEQUENCE_CLOSE"},
+ {MSG_SEQUENCE_RENAME, "MSG_SEQUENCE_RENAME"},
+ {MSG_BKUP_SEQUENCE_RENAME, "MSG_BKUP_SEQUENCE_RENAME"},
+ {MSG_BKUP_SEQUENCE_RENAME, "MSG_BKUP_SEQUENCE_RENAME"},
+ {MSG_SEQUENCE_ALTER, "MSG_SEQUENCE_ALTER"},
+ {MSG_BKUP_SEQUENCE_ALTER, "MSG_BKUP_SEQUENCE_ALTER"},
+ {MSG_SEQUENCE_LIST, "MSG_SEQUENCE_LIST"},
+ {MSG_TXN_GET_STATUS, "MSG_TXN_GET_STATUS"},
+ {MSG_TXN_GET_ALL_PREPARED, "MSG_TXN_GET_ALL_PREPARED"},
+ {MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, "MSG_TXN_BEGIN_GETGXID_AUTOVACUUM"},
+ {MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, "MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM"},
+ {MSG_DATA_FLUSH, "MSG_DATA_FLUSH"},
+ {MSG_BACKEND_DISCONNECT, "MSG_BACKEND_DISCONNECT"},
+ {MSG_TYPE_COUNT, "MSG_TYPE_COUNT"},
+ {-1, NULL}
+};
+
+static struct enum_name result_name_tab[] =
+{
+ {SYNC_STANDBY_RESULT, "SYNC_STANDBY_RESULT"},
+ {NODE_REGISTER_RESULT, "NODE_REGISTER_RESULT"},
+ {NODE_UNREGISTER_RESULT, "NODE_UNREGISTER_RESULT"},
+ {NODE_LIST_RESULT, "NODE_LIST_RESULT"},
+ {NODE_BEGIN_REPLICATION_INIT_RESULT, "NODE_BEGIN_REPLICATION_INIT_RESULT"},
+ {NODE_END_REPLICATION_INIT_RESULT, "NODE_END_REPLICATION_INIT_RESULT"},
+ {BEGIN_BACKUP_RESULT, "BEGIN_BACKUP_RESULT"},
+ {END_BACKUP_RESULT, "END_BACKUP_RESULT"},
+ {TXN_BEGIN_RESULT, "TXN_BEGIN_RESULT"},
+ {TXN_BEGIN_GETGXID_RESULT, "TXN_BEGIN_GETGXID_RESULT"},
+ {TXN_BEGIN_GETGXID_MULTI_RESULT, "TXN_BEGIN_GETGXID_MULTI_RESULT"},
+ {TXN_PREPARE_RESULT, "TXN_PREPARE_RESULT"},
+ {TXN_START_PREPARED_RESULT, "TXN_START_PREPARED_RESULT"},
+ {TXN_COMMIT_PREPARED_RESULT, "TXN_COMMIT_PREPARED_RESULT"},
+ {TXN_COMMIT_RESULT, "TXN_COMMIT_RESULT"},
+ {TXN_COMMIT_MULTI_RESULT, "TXN_COMMIT_MULTI_RESULT"},
+ {TXN_ROLLBACK_RESULT, "TXN_ROLLBACK_RESULT"},
+ {TXN_ROLLBACK_MULTI_RESULT, "TXN_ROLLBACK_MULTI_RESULT"},
+ {TXN_GET_GID_DATA_RESULT, "TXN_GET_GID_DATA_RESULT"},
+ {TXN_GET_GXID_RESULT, "TXN_GET_GXID_RESULT"},
+ {TXN_GET_NEXT_GXID_RESULT, "TXN_GET_NEXT_GXID_RESULT"},
+ {TXN_GXID_LIST_RESULT, "TXN_GXID_LIST_RESULT"},
+ {SNAPSHOT_GET_RESULT, "SNAPSHOT_GET_RESULT"},
+ {SNAPSHOT_GET_MULTI_RESULT, "SNAPSHOT_GET_MULTI_RESULT"},
+ {SNAPSHOT_GXID_GET_RESULT, "SNAPSHOT_GXID_GET_RESULT"},
+ {SEQUENCE_INIT_RESULT, "SEQUENCE_INIT_RESULT"},
+ {SEQUENCE_GET_CURRENT_RESULT, "SEQUENCE_GET_CURRENT_RESULT"},
+ {SEQUENCE_GET_NEXT_RESULT, "SEQUENCE_GET_NEXT_RESULT"},
+ {SEQUENCE_GET_LAST_RESULT, "SEQUENCE_GET_LAST_RESULT"},
+ {SEQUENCE_SET_VAL_RESULT, "SEQUENCE_SET_VAL_RESULT"},
+ {SEQUENCE_RESET_RESULT, "SEQUENCE_RESET_RESULT"},
+ {SEQUENCE_CLOSE_RESULT, "SEQUENCE_CLOSE_RESULT"},
+ {SEQUENCE_RENAME_RESULT, "SEQUENCE_RENAME_RESULT"},
+ {SEQUENCE_ALTER_RESULT, "SEQUENCE_ALTER_RESULT"},
+ {SEQUENCE_LIST_RESULT, "SEQUENCE_LIST_RESULT"},
+ {TXN_GET_STATUS_RESULT, "TXN_GET_STATUS_RESULT"},
+ {TXN_GET_ALL_PREPARED_RESULT, "TXN_GET_ALL_PREPARED_RESULT"},
+ {TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, "TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT"},
+ {RESULT_TYPE_COUNT, "RESULT_TYPE_COUNT"},
+ {-1, NULL}
+};
+
+static char **message_name = NULL;
+static int message_max;
+static char **result_name = NULL;
+static int result_max;
+
+void gtm_util_init_nametabs(void)
+{
+ int ii;
+
+ if (message_name)
+ free(message_name);
+ if (result_name)
+ free(result_name);
+ for (ii = 0, message_max = 0; message_name_tab[ii].type >= 0; ii++)
+ {
+ if (message_max < message_name_tab[ii].type)
+ message_max = message_name_tab[ii].type;
+ }
+ message_name = (char **)malloc(sizeof(char *) * (message_max + 1));
+ memset(message_name, sizeof(char *) * (message_max + 1), 0);
+ for (ii = 0; message_name_tab[ii].type >= 0; ii++)
+ {
+ message_name[message_name_tab[ii].type] = message_name_tab[ii].name;
+ }
+
+ for (ii = 0, result_max = 0; result_name_tab[ii].type >= 0; ii++)
+ {
+ if (result_max < result_name_tab[ii].type)
+ result_max = result_name_tab[ii].type;
+ }
+ result_name = (char **)malloc(sizeof(char *) * (result_max + 1));
+ memset(result_name, sizeof(char *) * (result_max + 1), 0);
+ for (ii = 0; result_name_tab[ii].type >= 0; ii++)
+ {
+ result_name[result_name_tab[ii].type] = result_name_tab[ii].name;
+ }
+}
+
+char *gtm_util_message_name(GTM_MessageType type)
+{
+ if (message_name == NULL)
+ gtm_util_init_nametabs();
+ if (type > message_max)
+ return "UNKNOWN_MESSAGE";
+ return message_name[type];
+}
+
+char *gtm_util_result_name(GTM_ResultType type)
+{
+ if (result_name == NULL)
+ gtm_util_init_nametabs();
+ if (type > result_max)
+ return "UNKNOWN_RESULT";
+ return result_name[type];
+}
/*
* gtm_report_failure() is an utility function to report fatal failure
@@ -37,3 +217,4 @@ gtm_report_failure(GTM_Conn *failed_conn)
return;
}
#endif
+
diff --git a/src/gtm/libpq/pqcomm.c b/src/gtm/libpq/pqcomm.c
index d2a6e9b6b5..048434ded4 100644
--- a/src/gtm/libpq/pqcomm.c
+++ b/src/gtm/libpq/pqcomm.c
@@ -97,9 +97,9 @@
/* Where the Unix socket file is */
static char sock_path[MAXGTMPATH];
-static int tcp_keepalives_idle;
-static int tcp_keepalives_interval;
-static int tcp_keepalives_count;
+extern int tcp_keepalives_idle;
+extern int tcp_keepalives_interval;
+extern int tcp_keepalives_count;
/*
diff --git a/src/gtm/main/gtm.conf.sample b/src/gtm/main/gtm.conf.sample
index d0e3fdb629..47a0ce443f 100644
--- a/src/gtm/main/gtm.conf.sample
+++ b/src/gtm/main/gtm.conf.sample
@@ -56,3 +56,4 @@
# Valid value: DEBUG, DEBUG5, DEBUG4, DEBUG3,
# DEBUG2, DEBUG1, INFO, NOTICE, WARNING,
# ERROR, LOG, FATAL, PANIC
+#synchronous_backup = off # If backup to standby is synchronous
diff --git a/src/gtm/main/gtm_opt.c b/src/gtm/main/gtm_opt.c
index c84dbe661f..9548081ba9 100644
--- a/src/gtm/main/gtm_opt.c
+++ b/src/gtm/main/gtm_opt.c
@@ -41,6 +41,7 @@ const char *config_filename = CONFIG_FILENAME;
extern char *NodeName;
extern char *ListenAddresses;
+extern bool Backup_synchronously;
extern int GTMPortNumber;
extern char *active_addr;
extern int active_port;
@@ -48,9 +49,9 @@ extern int GTM_StandbyMode;
extern char *error_reporter;
extern char *status_reader;
extern int log_min_messages;
-extern int keepalives_idle;
-extern int keepalives_count;
-extern int keepalives_interval;
+extern int tcp_keepalives_idle;
+extern int tcp_keepalives_count;
+extern int tcp_keepalives_interval;
extern char *GTMDataDir;
@@ -122,6 +123,15 @@ Config_Type_Names();
struct config_bool ConfigureNamesBool[] =
{
+ {
+ {GTM_OPTNAME_SYNCHRONOUS_BACKUP, GTMC_STARTUP,
+ gettext_noop("Specifies if backup to GTM-Standby is taken in synchronous manner."),
+ gettext_noop("Default value is off."),
+ 0
+ },
+ &Backup_synchronously,
+ false, false, NULL
+ },
/* End-of-list marker */
{
{NULL, 0, NULL, NULL, 0}, NULL, false, false, NULL
@@ -157,7 +167,7 @@ struct config_int ConfigureNamesInt[] =
gettext_noop("This option is effective only when it runs as GTM-Standby."),
GTMOPT_UNIT_TIME
},
- &keepalives_idle,
+ &tcp_keepalives_idle,
0, 0, INT_MAX,
0, NULL
},
@@ -167,7 +177,7 @@ struct config_int ConfigureNamesInt[] =
gettext_noop("This option is effective only when it runs as GTM-Standby."),
GTMOPT_UNIT_TIME
},
- &keepalives_interval,
+ &tcp_keepalives_interval,
0, 0, INT_MAX,
0, NULL
},
@@ -177,7 +187,7 @@ struct config_int ConfigureNamesInt[] =
gettext_noop("This option is effective only when it runs as GTM-Standby."),
0
},
- &keepalives_count,
+ &tcp_keepalives_count,
0, 0, INT_MAX,
0, NULL
},
diff --git a/src/gtm/main/gtm_seq.c b/src/gtm/main/gtm_seq.c
index 05beef11e7..5f154c6949 100644
--- a/src/gtm/main/gtm_seq.c
+++ b/src/gtm/main/gtm_seq.c
@@ -27,6 +27,8 @@
#include "gtm/libpq-int.h"
#include "gtm/pqformat.h"
+extern bool Backup_synchronously;
+
typedef struct GTM_SeqInfoHashBucket
{
gtm_List *shb_list;
@@ -831,10 +833,12 @@ GTM_InitSeqManager(void)
}
/*
- * Process MSG_SEQUENCE_INIT message
+ * Process MSG_SEQUENCE_INIT/MSG_BKUP_SEQUENCE_INIT message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_INIT
*/
void
-ProcessSequenceInitCommand(Port *myport, StringInfo message)
+ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
GTM_Sequence increment, minval, maxval, startval;
@@ -882,55 +886,69 @@ ProcessSequenceInitCommand(Port *myport, StringInfo message)
pq_getmsgend(message);
- /*
- * Send a SUCCESS message back to the client
- */
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_INIT_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if (GetMyThreadInfo->thr_conn->standby)
+ if (!is_backup)
{
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
-
- elog(LOG, "calling open_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
-
- retry:
- rc = open_sequence(GetMyThreadInfo->thr_conn->standby,
- &seqkey,
- increment,
- minval,
- maxval,
- startval,
- cycle);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
+
+ elog(LOG, "calling open_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+
+ retry:
+ rc = bkup_open_sequence(GetMyThreadInfo->thr_conn->standby,
+ &seqkey,
+ increment,
+ minval,
+ maxval,
+ startval,
+ cycle);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- elog(LOG, "open_sequence() returns rc %d.", rc);
- }
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+ elog(LOG, "open_sequence() returns rc %d.", rc);
+ }
+ /*
+ * Send a SUCCESS message back to the client
+ */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_INIT_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
/* FIXME: need to check errors */
}
+
/*
- * Process MSG_SEQUENCE_ALTER message
+ * Process MSG_SEQUENCE_ALTER/MSG_BKUP_SEQUENCE_ALTER message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_ALTER
*/
void
-ProcessSequenceAlterCommand(Port *myport, StringInfo message)
+ProcessSequenceAlterCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
GTM_Sequence increment, minval, maxval, startval, lastval;
@@ -980,47 +998,58 @@ ProcessSequenceAlterCommand(Port *myport, StringInfo message)
pq_getmsgend(message);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_ALTER_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if ( GetMyThreadInfo->thr_conn->standby )
+ if (!is_backup)
{
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ /* Backup first */
+ if ( GetMyThreadInfo->thr_conn->standby )
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- elog(LOG, "calling alter_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ elog(LOG, "calling alter_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- retry:
- rc = alter_sequence(GetMyThreadInfo->thr_conn->standby,
- &seqkey,
- increment,
- minval,
- maxval,
- startval,
- lastval,
- cycle,
- is_restart);
+ retry:
+ rc = bkup_alter_sequence(GetMyThreadInfo->thr_conn->standby,
+ &seqkey,
+ increment,
+ minval,
+ maxval,
+ startval,
+ lastval,
+ cycle,
+ is_restart);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
+ elog(LOG, "alter_sequence() returns rc %d.", rc);
+ }
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_ALTER_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
- elog(LOG, "alter_sequence() returns rc %d.", rc);
- }
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
- /* FIXME: need to check errors */
+ /* FIXME: need to check errors */
+ }
}
@@ -1113,6 +1142,7 @@ ProcessSequenceListCommand(Port *myport, StringInfo message)
elog(LOG, "ProcessSequenceListCommand() done.");
if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ /* Don't flush to the backup because this does not change the internal status */
pq_flush(myport);
}
@@ -1152,8 +1182,14 @@ ProcessSequenceGetCurrentCommand(Port *myport, StringInfo message)
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ /* Don't flush to the standby because this does not change the status */
pq_flush(myport);
+ /*
+ * I don't think backup is needed here. It does not change internal state.
+ * 27th Dec., 2011, K.Suzuki
+ */
+#if 0
if (GetMyThreadInfo->thr_conn->standby)
{
GTM_Sequence loc_seq;
@@ -1170,15 +1206,18 @@ retry:
elog(LOG, "get_current() returns GTM_Sequence %ld.", loc_seq);
}
+#endif
/* FIXME: need to check errors */
}
/*
- * Process MSG_SEQUENCE_GET_NEXT message
+ * Process MSG_SEQUENCE_GET_NEXT/MSG_BKUP_SEQUENCE_GET_NEXT message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_GET_NEXT
*/
void
-ProcessSequenceGetNextCommand(Port *myport, StringInfo message)
+ProcessSequenceGetNextCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
StringInfoData buf;
@@ -1195,46 +1234,62 @@ ProcessSequenceGetNextCommand(Port *myport, StringInfo message)
elog(LOG, "Getting next value %ld for sequence %s", seqval, seqkey.gsk_key);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_GET_NEXT_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_sendbytes(&buf, (char *)&seqval, sizeof (GTM_Sequence));
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ GTM_Sequence loc_seq;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling get_next() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- GTM_Sequence loc_seq;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ loc_seq = bkup_get_next(GetMyThreadInfo->thr_conn->standby, &seqkey);
+
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- elog(LOG, "calling get_next() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ /* Sync */
+ if (Backup_synchronously &&(myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- retry:
- loc_seq = get_next(GetMyThreadInfo->thr_conn->standby, &seqkey);
-
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "get_next() returns GTM_Sequence %ld.", loc_seq);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_GET_NEXT_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_sendbytes(&buf, (char *)&seqval, sizeof (GTM_Sequence));
+ pq_endmessage(myport, &buf);
- elog(LOG, "get_next() returns GTM_Sequence %ld.", loc_seq);
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush to the standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+
+ /* FIXME: need to check errors */
}
- /* FIXME: need to check errors */
}
/*
- * Process MSG_SEQUENCE_SET_VAL message
+ * Process MSG_SEQUENCE_SET_VAL/MSG_BKUP_SEQUENCE_SET_VAL message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_SET_VAL
*/
void
-ProcessSequenceSetValCommand(Port *myport, StringInfo message)
+ProcessSequenceSetValCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
GTM_Sequence nextval;
@@ -1273,48 +1328,64 @@ ProcessSequenceSetValCommand(Port *myport, StringInfo message)
pq_getmsgend(message);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_SET_VAL_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- elog(LOG, "calling set_val() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ elog(LOG, "calling set_val() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
-retry:
- rc = set_val(GetMyThreadInfo->thr_conn->standby,
- &seqkey,
- nextval,
- iscalled);
+ retry:
+ rc = bkup_set_val(GetMyThreadInfo->thr_conn->standby,
+ &seqkey,
+ nextval,
+ iscalled);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- elog(LOG, "set_val() returns rc %d.", rc);
- }
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
+ elog(LOG, "set_val() returns rc %d.", rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_SET_VAL_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+
+ }
/* FIXME: need to check errors */
}
/*
- * Process MSG_SEQUENCE_RESET message
+ * Process MSG_SEQUENCE_RESET/MSG_BKUP_SEQUENCE_RESET message
+ *
+ * is_backup indicates the cmessage is MSG_BKUP_SEQUENCE_RESULT
*/
void
-ProcessSequenceResetCommand(Port *myport, StringInfo message)
+ProcessSequenceResetCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
StringInfoData buf;
@@ -1330,45 +1401,61 @@ ProcessSequenceResetCommand(Port *myport, StringInfo message)
(errcode,
errmsg("Can not reset the sequence")));
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_RESET_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling reset_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+
+ retry:
+ rc = bkup_reset_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- elog(LOG, "calling reset_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
-
-retry:
- rc = reset_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "reset_sequence() returns rc %d.", rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_RESET_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
- elog(LOG, "reset_sequence() returns rc %d.", rc);
}
/* FIXME: need to check errors */
}
/*
- * Process MSG_SEQUENCE_CLOSE message
+ * Process MSG_SEQUENCE_CLOSE/MSG_BKUP_SEQUENCE_CLOSE message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_CLOSE
*/
void
-ProcessSequenceCloseCommand(Port *myport, StringInfo message)
+ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey;
StringInfoData buf;
@@ -1386,45 +1473,61 @@ ProcessSequenceCloseCommand(Port *myport, StringInfo message)
(errcode,
errmsg("Can not close the sequence")));
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_CLOSE_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, seqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling close_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ rc = bkup_close_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey);
- elog(LOG, "calling close_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
-retry:
- rc = close_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "close_sequence() returns rc %d.", rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_CLOSE_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, seqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, seqkey.gsk_key, seqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
- elog(LOG, "close_sequence() returns rc %d.", rc);
+ /* FIXME: need to check errors */
}
- /* FIXME: need to check errors */
}
/*
- * Process MSG_SEQUENCE_RENAME message
+ * Process MSG_SEQUENCE_RENAME/MSG_BKUP_SEQUENCE_RENAME message
+ *
+ * is_backup indicates the message is MSG_BKUP_SEQUENCE_RENAME
*/
void
-ProcessSequenceRenameCommand(Port *myport, StringInfo message)
+ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup)
{
GTM_SequenceKeyData seqkey, newseqkey;
StringInfoData buf;
@@ -1457,39 +1560,51 @@ ProcessSequenceRenameCommand(Port *myport, StringInfo message)
pq_getmsgend(message);
- /* Send a SUCCESS message back to the client */
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, SEQUENCE_RENAME_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendint(&buf, newseqkey.gsk_keylen, 4);
- pq_sendbytes(&buf, newseqkey.gsk_key, newseqkey.gsk_keylen);
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling rename_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ rc = bkup_rename_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey, &newseqkey);
- elog(LOG, "calling rename_sequence() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
-retry:
- rc = rename_sequence(GetMyThreadInfo->thr_conn->standby, &seqkey, &newseqkey);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "rename_sequence() returns rc %d.", rc);
+ }
+ /* Send a SUCCESS message back to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SEQUENCE_RENAME_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendint(&buf, newseqkey.gsk_keylen, 4);
+ pq_sendbytes(&buf, newseqkey.gsk_key, newseqkey.gsk_keylen);
+ pq_endmessage(myport, &buf);
- elog(LOG, "rename_sequence() returns rc %d.", rc);
- }
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
/* FIXME: need to check errors */
}
diff --git a/src/gtm/main/gtm_snap.c b/src/gtm/main/gtm_snap.c
index 5d94479933..c5db23d645 100644
--- a/src/gtm/main/gtm_snap.c
+++ b/src/gtm/main/gtm_snap.c
@@ -443,6 +443,10 @@ ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message)
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
+#if 0
+ /* Do not need this portion because this command does not change
+ * internal status.
+ */
if (GetMyThreadInfo->thr_conn->standby)
{
int _rc;
@@ -468,6 +472,7 @@ retry:
elog(LOG, "snapshot_get_multi() rc=%d done.", _rc);
}
+#endif
return;
}
diff --git a/src/gtm/main/gtm_standby.c b/src/gtm/main/gtm_standby.c
index f065d2ec19..b5a08deb55 100644
--- a/src/gtm/main/gtm_standby.c
+++ b/src/gtm/main/gtm_standby.c
@@ -261,7 +261,7 @@ gtm_standby_register_self(const char *node_name, int port, const char *datadir)
standbyDataDir= (char *)datadir;
rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,
- standbyNodeName, standbyDataDir, NODE_DISCONNECTED);
+ standbyNodeName, standbyDataDir, NODE_DISCONNECTED);
if (rc < 0)
{
elog(LOG, "Failed to register a standby-GTM status.");
@@ -293,7 +293,7 @@ gtm_standby_activate_self(void)
}
rc = node_register_internal(GTM_ActiveConn, GTM_NODE_GTM, standbyHostName, standbyPortNumber,
- standbyNodeName, standbyDataDir, NODE_CONNECTED);
+ standbyNodeName, standbyDataDir, NODE_CONNECTED);
if (rc < 0)
{
@@ -460,7 +460,13 @@ gtm_standby_reconnect_to_standby(GTM_Conn *old_conn, int retry_max)
bool
gtm_standby_check_communication_error(int *retry_count, GTM_Conn *oldconn)
{
- if (GetMyThreadInfo->thr_conn->standby->result->gr_status == GTM_RESULT_COMM_ERROR)
+ GTM_ThreadInfo *my_threadInfo = GetMyThreadInfo;
+
+ /*
+ * This function may be called without result from standby.
+ */
+ if (my_threadInfo->thr_conn->standby->result
+ && my_threadInfo->thr_conn->standby->result->gr_status == GTM_RESULT_COMM_ERROR)
{
if (*retry_count == 0)
{
diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c
index a5cd72b3ab..1e1a674fc9 100644
--- a/src/gtm/main/gtm_txn.c
+++ b/src/gtm/main/gtm_txn.c
@@ -28,10 +28,17 @@
#include "gtm/libpq-int.h"
#include "gtm/pqformat.h"
+extern bool Backup_synchronously;
+
/* Local functions */
static XidStatus GlobalTransactionIdGetStatus(GlobalTransactionId transactionId);
static bool GTM_SetDoVacuum(GTM_TransactionHandle handle);
-
+static void init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo,
+ char *coord_name,
+ GTM_TransactionHandle txn,
+ GTM_IsolationLevel isolevel,
+ GTMProxy_ConnID connid,
+ bool readonly);
GTM_Transactions GTMTransactions;
void
@@ -676,23 +683,8 @@ GTM_BeginTransactionMulti(char *coord_name,
}
}
+ init_GTM_TransactionInfo(gtm_txninfo[kk], coord_name, ii, isolevel[kk], connid[kk], readonly[kk]);
- gtm_txninfo[kk]->gti_gxid = InvalidGlobalTransactionId;
- gtm_txninfo[kk]->gti_xmin = InvalidGlobalTransactionId;
- gtm_txninfo[kk]->gti_state = GTM_TXN_STARTING;
- gtm_txninfo[kk]->gti_coordname = pstrdup(coord_name);
-
- gtm_txninfo[kk]->gti_isolevel = isolevel[kk];
- gtm_txninfo[kk]->gti_readonly = readonly[kk];
- gtm_txninfo[kk]->gti_backend_id = connid[kk];
- gtm_txninfo[kk]->gti_in_use = true;
-
- gtm_txninfo[kk]->nodestring = NULL;
- gtm_txninfo[kk]->gti_gid = NULL;
-
- gtm_txninfo[kk]->gti_handle = ii;
- gtm_txninfo[kk]->gti_vacuum = false;
- gtm_txninfo[kk]->gti_thread_id = pthread_self();
GTMTransactions.gt_lastslot = ii;
txns[kk] = ii;
@@ -726,6 +718,79 @@ GTM_BeginTransaction(char *coord_name,
return txn;
}
+static void
+init_GTM_TransactionInfo(GTM_TransactionInfo *gtm_txninfo,
+ char *coord_name,
+ GTM_TransactionHandle txn,
+ GTM_IsolationLevel isolevel,
+ GTMProxy_ConnID connid,
+ bool readonly)
+{
+ gtm_txninfo->gti_gxid = InvalidGlobalTransactionId;
+ gtm_txninfo->gti_xmin = InvalidGlobalTransactionId;
+ gtm_txninfo->gti_state = GTM_TXN_STARTING;
+ gtm_txninfo->gti_coordname = pstrdup(coord_name);
+
+ gtm_txninfo->gti_isolevel = isolevel;
+ gtm_txninfo->gti_readonly = readonly;
+ gtm_txninfo->gti_backend_id = connid;
+ gtm_txninfo->gti_in_use = true;
+
+ gtm_txninfo->nodestring = NULL;
+ gtm_txninfo->gti_gid = NULL;
+
+ gtm_txninfo->gti_handle = txn;
+ gtm_txninfo->gti_vacuum = false;
+ gtm_txninfo->gti_thread_id = pthread_self();
+}
+
+
+void
+GTM_BkupBeginTransactionMulti(char *coord_name,
+ GTM_TransactionHandle *txn,
+ GTM_IsolationLevel *isolevel,
+ bool *readonly,
+ GTMProxy_ConnID *connid,
+ int txn_count)
+{
+ GTM_TransactionInfo *gtm_txninfo;
+ MemoryContext oldContext;
+ int kk;
+
+ gtm_txninfo = NULL;
+
+ oldContext = MemoryContextSwitchTo(TopMostMemoryContext);
+ GTM_RWLockAcquire(&GTMTransactions.gt_TransArrayLock, GTM_LOCKMODE_WRITE);
+
+ for (kk = 0; kk < txn_count; kk++)
+ {
+ gtm_txninfo = &GTMTransactions.gt_transactions_array[txn[kk]];
+ if (gtm_txninfo->gti_in_use)
+ {
+ GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
+ elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).",
+ txn[kk]);
+ return;
+ }
+ init_GTM_TransactionInfo(gtm_txninfo, coord_name, txn[kk], isolevel[kk], connid[kk], readonly[kk]);
+ GTMTransactions.gt_lastslot = txn[kk];
+ GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo);
+ }
+
+ GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
+ MemoryContextSwitchTo(oldContext);
+}
+
+void
+GTM_BkupBeginTransaction(char *coord_name,
+ GTM_TransactionHandle txn,
+ GTM_IsolationLevel isolevel,
+ bool readonly)
+{
+ GTMProxy_ConnID connid = -1;
+
+ GTM_BkupBeginTransactionMulti(coord_name, &txn, &isolevel, &readonly, &connid, 1);
+}
/*
* Same as GTM_RollbackTransaction, but takes GXID as input
*/
@@ -996,6 +1061,15 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ bkup_begin_transaction(GetMyThreadInfo->thr_conn->standby, txn, txn_isolation_level, txn_read_only, timestamp);
+ /* Synch. with standby */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+ }
+
/* GXID has been received, now it's time to get a GTM timestamp */
timestamp = GTM_TimestampGetCurrent();
@@ -1012,11 +1086,41 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message)
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
pq_flush(myport);
+ }
return;
}
/*
+ * Process MSG_BKUP_TXN_BEGIN message
+ */
+void
+ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message)
+{
+ GTM_TransactionHandle txn;
+ GTM_IsolationLevel txn_isolation_level;
+ bool txn_read_only;
+ GTM_Timestamp timestamp;
+ MemoryContext oldContext;
+
+ txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
+ txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
+ txn_read_only = pq_getmsgbyte(message);
+ memcpy(&timestamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp));
+ pq_getmsgend(message);
+
+ oldContext = MemoryContextSwitchTo(TopMemoryContext);
+
+ GTM_BkupBeginTransaction("", txn, txn_isolation_level, txn_read_only);
+
+ MemoryContextSwitchTo(oldContext);
+}
+
+/*
* Process MSG_TXN_BEGIN_GETGXID message
*/
void
@@ -1059,6 +1163,27 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message)
elog(LOG, "Sending transaction id %u", gxid);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
+
+ elog(LOG, "calling begin_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+
+retry:
+ bkup_begin_transaction_gxid(GetMyThreadInfo->thr_conn->standby,
+ txn, gxid, txn_isolation_level, txn_read_only, timestamp);
+
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
+ }
+ /* Respond to the client */
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, TXN_BEGIN_GETGXID_RESULT, 4);
if (myport->remote_type == GTM_NODE_GTM_PROXY)
@@ -1072,27 +1197,112 @@ ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message)
pq_endmessage(myport, &buf);
if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
pq_flush(myport);
+ }
- if (GetMyThreadInfo->thr_conn->standby)
- {
- GTM_Timestamp _ts;
- GlobalTransactionId _gxid;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
- elog(LOG, "calling begin_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ return;
+}
-retry:
- _gxid = begin_transaction(GetMyThreadInfo->thr_conn->standby, txn_isolation_level, &_ts);
+static void
+GTM_BkupBeginTransactionGetGXIDMulti(char *coord_name,
+ GTM_TransactionHandle *txn,
+ GlobalTransactionId *gxid,
+ GTM_IsolationLevel *isolevel,
+ bool *readonly,
+ GTMProxy_ConnID *connid,
+ int txn_count)
+{
+ GTM_TransactionInfo *gtm_txninfo;
+ int ii;
+ MemoryContext oldContext;
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ oldContext = MemoryContextSwitchTo(TopMemoryContext);
+ GTM_RWLockAcquire(&GTMTransactions.gt_XidGenLock, GTM_LOCKMODE_WRITE);
- elog(LOG, "begin_transaction() returns GXID %d.", _gxid);
+ for (ii = 0; ii < txn_count; ii++)
+ {
+ gtm_txninfo = &GTMTransactions.gt_transactions_array[txn[ii]];
+ if (gtm_txninfo->gti_in_use)
+ {
+ GTM_RWLockRelease(&GTMTransactions.gt_TransArrayLock);
+ elog(ERROR, "GTM_TransactionInfo already in use. Cannot assign the transaction: handle (%d).",
+ txn[ii]);
+ return;
+ }
+ init_GTM_TransactionInfo(gtm_txninfo, coord_name, txn[ii], isolevel[ii], connid[ii], readonly[ii]);
+ GTMTransactions.gt_lastslot = txn[ii];
+ gtm_txninfo->gti_gxid = gxid[ii];
+ /*
+ * Advance next gxid
+ */
+ if (GlobalTransactionIdPrecedes(GTMTransactions.gt_nextXid, gxid[ii]))
+ GTMTransactions.gt_nextXid = gxid[ii] + 1;
+ if (!GlobalTransactionIdIsValid(GTMTransactions.gt_nextXid)) /* Handle wrap around too */
+ GTMTransactions.gt_nextXid = FirstNormalGlobalTransactionId;
+ GTMTransactions.gt_open_transactions = gtm_lappend(GTMTransactions.gt_open_transactions, gtm_txninfo);
}
- return;
+
+ GTM_RWLockRelease(&GTMTransactions.gt_XidGenLock);
+ MemoryContextSwitchTo(oldContext);
+}
+
+static void
+GTM_BkupBeginTransactionGetGXID(char *coord_name,
+ GTM_TransactionHandle txn,
+ GlobalTransactionId gxid,
+ GTM_IsolationLevel isolevel,
+ bool readonly)
+{
+ GTMProxy_ConnID connid = -1;
+
+ GTM_BkupBeginTransactionGetGXIDMulti(coord_name, &txn, &gxid, &isolevel, &readonly, &connid, 1);
+}
+
+/*
+ * Process MSG_BKUP_TXN_BEGIN_GETGXID message
+ */
+void
+ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message)
+{
+ GTM_TransactionHandle txn;
+ GlobalTransactionId gxid;
+ GTM_IsolationLevel txn_isolation_level;
+ bool txn_read_only;
+ GTM_Timestamp timestamp;
+
+ txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
+ gxid = pq_getmsgint(message, sizeof(GlobalTransactionId));
+ txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
+ txn_read_only = pq_getmsgbyte(message);
+ memcpy(&timestamp, pq_getmsgbytes(message, sizeof(GTM_Timestamp)), sizeof(GTM_Timestamp));
+ pq_getmsgend(message);
+
+ GTM_BkupBeginTransactionGetGXID("", txn, gxid, txn_isolation_level, txn_read_only);
+}
+
+/*
+ * Process MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM message
+ */
+void
+ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message)
+{
+ GTM_TransactionHandle txn;
+ GlobalTransactionId gxid;
+ GTM_IsolationLevel txn_isolation_level;
+
+ txn = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
+ gxid = pq_getmsgint(message, sizeof(GlobalTransactionId));
+ txn_isolation_level = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
+ pq_getmsgend(message);
+
+ GTM_BkupBeginTransactionGetGXID("", txn, gxid, txn_isolation_level, false);
+ GTM_SetDoVacuum(txn);
}
/*
@@ -1139,20 +1349,7 @@ ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message
elog(DEBUG3, "Sending transaction id %d", gxid);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
+ /* Backup first */
if (GetMyThreadInfo->thr_conn->standby)
{
GlobalTransactionId _gxid;
@@ -1160,17 +1357,40 @@ ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message
int count = 0;
elog(LOG, "calling begin_transaction_autovacuum() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ GetMyThreadInfo->thr_conn->standby);
retry:
- _gxid = begin_transaction_autovacuum(GetMyThreadInfo->thr_conn->standby,
- txn_isolation_level);
+ _gxid = bkup_begin_transaction_autovacuum(GetMyThreadInfo->thr_conn->standby,
+ txn, gxid, txn_isolation_level);
if (gtm_standby_check_communication_error(&count, oldconn))
goto retry;
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
elog(LOG, "begin_transaction_autovacuum() GXID=%d done.", _gxid);
}
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
return;
}
@@ -1186,7 +1406,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
int txn_count;
StringInfoData buf;
GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
- GlobalTransactionId gxid, end_gxid;
+ GlobalTransactionId start_gxid, end_gxid;
GTM_Timestamp timestamp;
GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS];
MemoryContext oldContext;
@@ -1219,8 +1439,8 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
(EINVAL,
errmsg("Failed to start %d new transactions", txn_count)));
- gxid = GTM_GetGlobalTransactionIdMulti(txn, txn_count);
- if (gxid == InvalidGlobalTransactionId)
+ start_gxid = GTM_GetGlobalTransactionIdMulti(txn, txn_count);
+ if (start_gxid == InvalidGlobalTransactionId)
ereport(ERROR,
(EINVAL,
errmsg("Failed to get a new transaction id")));
@@ -1230,34 +1450,16 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
/* GXID has been received, now it's time to get a GTM timestamp */
timestamp = GTM_TimestampGetCurrent();
- end_gxid = gxid + txn_count;
- if (end_gxid < gxid)
+ end_gxid = start_gxid + txn_count;
+ if (end_gxid < start_gxid)
end_gxid += FirstNormalGlobalTransactionId;
- elog(LOG, "Sending transaction ids from %u to %u", gxid, end_gxid);
-
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_BEGIN_GETGXID_MULTI_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
- pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
- pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp));
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "Sending transaction ids from %u to %u", start_gxid, end_gxid);
+ /* Backup first */
if (GetMyThreadInfo->thr_conn->standby)
{
int _rc;
- int txn_count_out;
- GlobalTransactionId gxid_out;
- GTM_Timestamp ts_out;
GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
int count = 0;
@@ -1265,29 +1467,85 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
GetMyThreadInfo->thr_conn->standby);
retry:
- _rc = begin_transaction_multi(GetMyThreadInfo->thr_conn->standby,
- txn_count,
- txn_isolation_level,
- txn_read_only,
- txn_connid,
- &txn_count_out,
- &gxid_out,
- &ts_out);
+ _rc = bkup_begin_transaction_multi(GetMyThreadInfo->thr_conn->standby,
+ txn_count,
+ txn,
+ start_gxid,
+ txn_isolation_level,
+ txn_read_only,
+ txn_connid);
if (gtm_standby_check_communication_error(&count, oldconn))
goto retry;
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
elog(LOG, "begin_transaction_multi() rc=%d done.", _rc);
}
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_BEGIN_GETGXID_MULTI_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
+ pq_sendbytes(&buf, (char *)&start_gxid, sizeof(start_gxid));
+ pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp));
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
return;
}
/*
- * Process MSG_TXN_COMMIT message
+ * Process MSG_BKUP_BEGIN_TXN_GETGXID_MULTI message
+ */
+void
+ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
+{
+ int txn_count;
+ GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
+ GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS];
+ GTM_IsolationLevel txn_isolation_level[GTM_MAX_GLOBAL_TRANSACTIONS];
+ bool txn_read_only[GTM_MAX_GLOBAL_TRANSACTIONS];
+ GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS];
+ int ii;
+
+ txn_count = pq_getmsgint(message, sizeof(int));
+ if (txn_count <= 0)
+ elog(PANIC, "Zero or less transaction count.");
+
+ for (ii = 0; ii < txn_count; ii++)
+ {
+ txn[ii] = pq_getmsgint(message, sizeof(GTM_TransactionHandle));
+ gxid[ii] = pq_getmsgint(message, sizeof(GlobalTransactionId));
+ txn_isolation_level[ii] = pq_getmsgint(message, sizeof(GTM_IsolationLevel));
+ txn_read_only[ii] = pq_getmsgbyte(message);
+ txn_connid[ii] = pq_getmsgint(message, sizeof(GTMProxy_ConnID));
+ }
+
+ GTM_BkupBeginTransactionGetGXIDMulti("", txn, gxid, txn_isolation_level, txn_read_only, txn_connid, txn_count);
+
+}
+/*
+ * Process MSG_TXN_COMMIT/MSG_BKUP_TXN_COMMIT message
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_COMMIT
*/
void
-ProcessCommitTransactionCommand(Port *myport, StringInfo message)
+ProcessCommitTransactionCommand(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn;
@@ -1331,48 +1589,65 @@ ProcessCommitTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_COMMIT_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if(!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
- pq_sendint(&buf, status, sizeof(status));
- pq_endmessage(myport, &buf);
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ /*
+ * Backup first
+ */
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling commit_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ _rc = bkup_commit_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
- elog(LOG, "calling commit_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
-retry:
- _rc = commit_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "commit_transaction() rc=%d done.", _rc);
+ }
- elog(LOG, "commit_transaction() rc=%d done.", _rc);
- }
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_COMMIT_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
+ pq_sendint(&buf, status, sizeof(status));
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
return;
}
/*
- * Process MSG_TXN_COMMIT_PREPARED_MSG
+ * Process MSG_TXN_COMMIT_PREPARED/MSG_BKUP_TXN_COMMIT_PREPARED message
* Commit a prepared transaction
* Here the GXID used for PREPARE and COMMIT PREPARED are both committed
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_COMMIT_PREPARED
*/
void
-ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message)
+ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
int txn_count = 2; /* PREPARE and COMMIT PREPARED gxid's */
@@ -1422,40 +1697,53 @@ ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_COMMIT_PREPARED_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid[0], sizeof(GlobalTransactionId));
- pq_sendint(&buf, status[0], 4);
- pq_endmessage(myport, &buf);
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ /* Backup first */
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling commit_prepared_transaction() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ _rc = bkup_commit_prepared_transaction(GetMyThreadInfo->thr_conn->standby,
+ gxid[0], gxid[1] /* prepared GXID */);
- elog(LOG, "calling commit_prepared_transaction() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
-retry:
- _rc = commit_prepared_transaction(GetMyThreadInfo->thr_conn->standby,
- gxid[0], gxid[1] /* prepared GXID */);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "commit_prepared_transaction() rc=%d done.", _rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_COMMIT_PREPARED_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid[0], sizeof(GlobalTransactionId));
+ pq_sendint(&buf, status[0], 4);
+ pq_endmessage(myport, &buf);
- elog(LOG, "commit_prepared_transaction() rc=%d done.", _rc);
- }
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
return;
}
@@ -1549,9 +1837,12 @@ ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message)
/* End of message */
pq_endmessage(myport, &buf);
+ /* No backup to the standby because this does not change internal status */
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
+ /* I don't think the following backup is needed. K.Suzuki, 27th, Dec., 2011 */
+#if 0
if (GetMyThreadInfo->thr_conn->standby)
{
int _rc;
@@ -1574,6 +1865,7 @@ retry:
elog(LOG, "get_gid_data() rc=%d done.", _rc);
}
+#endif
return;
}
@@ -1630,6 +1922,7 @@ ProcessGXIDListCommand(Port *myport, StringInfo message)
pq_sendbytes(&buf, data, actlen); /* serialized GTM_Transactions */
pq_endmessage(myport, &buf);
+ /* No backup to the standby because this does not change internal state */
if (myport->remote_type != GTM_NODE_GTM_PROXY)
{
pq_flush(myport);
@@ -1644,10 +1937,12 @@ ProcessGXIDListCommand(Port *myport, StringInfo message)
/*
- * Process MSG_TXN_ROLLBACK message
+ * Process MSG_TXN_ROLLBACK/MSG_BKUP_TXN_ROLLBACK message
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_ROLLBACK
*/
void
-ProcessRollbackTransactionCommand(Port *myport, StringInfo message)
+ProcessRollbackTransactionCommand(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn;
@@ -1691,46 +1986,61 @@ ProcessRollbackTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_ROLLBACK_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
- pq_sendint(&buf, status, sizeof(status));
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling abort_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ bkup_abort_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
- elog(LOG, "calling abort_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
-retry:
- abort_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
+ elog(LOG, "abort_transaction() GXID=%d done.", gxid);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_ROLLBACK_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
+ pq_sendint(&buf, status, sizeof(status));
+ pq_endmessage(myport, &buf);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
- elog(LOG, "abort_transaction() GXID=%d done.", gxid);
}
-
return;
}
/*
- * Process MSG_TXN_COMMIT_MULTI message
+ * Process MSG_TXN_COMMIT_MULTI/MSG_BKUP_TXN_COMMIT_MULTI message
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_COMMIT_MULTI
*/
void
-ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message)
+ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
@@ -1780,50 +2090,60 @@ ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_COMMIT_MULTI_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
- pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if (GetMyThreadInfo->thr_conn->standby)
+ if (!is_backup)
{
- int _rc;
- int txn_count_out;
- int status_out[GTM_MAX_GLOBAL_TRANSACTIONS];
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ /* Backup first */
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- elog(LOG, "calling commit_transaction_multi() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ elog(LOG, "calling commit_transaction_multi() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
-retry:
- _rc = commit_transaction_multi(GetMyThreadInfo->thr_conn->standby,
- txn_count, gxid, &txn_count_out, status_out);
+ retry:
+ _rc = bkup_commit_transaction_multi(GetMyThreadInfo->thr_conn->standby, txn_count, txn);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- elog(LOG, "commit_transaction_multi() rc=%d done.", _rc);
- }
+ elog(LOG, "commit_transaction_multi() rc=%d done.", _rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_COMMIT_MULTI_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
+ pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
+ pq_endmessage(myport, &buf);
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
return;
}
/*
- * Process MSG_TXN_ROLLBACK_MULTI message
+ * Process MSG_TXN_ROLLBACK_MULTI/MSG_BKUP_TXN_ROLLBACK_MULTI message
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_ROLLBACK_MULTI
*/
void
-ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message)
+ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
@@ -1873,50 +2193,62 @@ ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_ROLLBACK_MULTI_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
- pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling abort_transaction_multi() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- int txn_count_out;
- int status_out[GTM_MAX_GLOBAL_TRANSACTIONS];
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ _rc = bkup_abort_transaction_multi(GetMyThreadInfo->thr_conn->standby, txn_count, gxid);
+
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- elog(LOG, "calling abort_transaction_multi() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ /* Sync */
+ if (Backup_synchronously &&(myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
-retry:
- _rc = abort_transaction_multi(GetMyThreadInfo->thr_conn->standby,
- txn_count, gxid, &txn_count_out, status_out);
-
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "abort_transaction_multi() rc=%d done.", _rc);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_ROLLBACK_MULTI_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
+ pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count);
+ pq_endmessage(myport, &buf);
- elog(LOG, "abort_transaction_multi() rc=%d done.", _rc);
- }
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
return;
}
/*
- * Process MSG_TXN_START_PREPARED message
+ * Process MSG_TXN_START_PREPARED/MSG_BKUP_TXN_START_PREPARED message
+ *
+ * is_backup indicates if the message is MSG_BKUP_TXN_START_PREPARED.
*/
void
-ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message)
+ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn;
@@ -1973,48 +2305,64 @@ ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_START_PREPARED_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid, sizeof(GlobalTransactionId));
- pq_endmessage(myport, &buf);
+ /*
+ * Backup first
+ */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling start_prepared_transaction() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ _rc = backup_start_prepared_transaction(GetMyThreadInfo->thr_conn->standby,
+ gxid, gid,
+ nodestring);
- elog(LOG, "calling start_prepared_transaction() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- retry:
- _rc = start_prepared_transaction(GetMyThreadInfo->thr_conn->standby,
- gxid, gid,
- nodestring);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "start_prepared_transaction() rc=%d done.", _rc);
+ }
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_START_PREPARED_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof(GlobalTransactionId));
+ pq_endmessage(myport, &buf);
- elog(LOG, "start_prepared_transaction() rc=%d done.", _rc);
- }
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
+ }
return;
}
/*
- * Process MSG_TXN_PREPARE message
+ * Process MSG_TXN_PREPARE/MSG_BKUP_TXN_PREPARE message
+ *
+ * is_backup indicates the message is MSG_BKUP_TXN_PREPARE
*/
void
-ProcessPrepareTransactionCommand(Port *myport, StringInfo message)
+ProcessPrepareTransactionCommand(Port *myport, StringInfo message, bool is_backup)
{
StringInfoData buf;
GTM_TransactionHandle txn;
@@ -2058,42 +2406,60 @@ ProcessPrepareTransactionCommand(Port *myport, StringInfo message)
elog(LOG, "Preparing transaction id %u", gxid);
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, TXN_PREPARE_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
- pq_endmessage(myport, &buf);
+ /* Backup first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
+ elog(LOG, "calling prepare_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
- if (GetMyThreadInfo->thr_conn->standby)
- {
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
+ retry:
+ bkup_prepare_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
- elog(LOG, "calling prepare_transaction() for standby GTM %p.", GetMyThreadInfo->thr_conn->standby);
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
- retry:
- prepare_transaction(GetMyThreadInfo->thr_conn->standby, gxid);
+ /* Sync */
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ elog(LOG, "prepare_transaction() GXID=%d done.", gxid);
+ }
+ /* Respond to the client */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, TXN_PREPARE_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
+ pq_endmessage(myport, &buf);
- elog(LOG, "prepare_transaction() GXID=%d done.", gxid);
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ /* Flush the standby */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
}
-
return;
+
}
/*
* Process MSG_TXN_GET_GXID message
+ *
+ * Notice: we don't have corresponding functions in gtm_client.c which
+ * generates a command for this function.
+ *
+ * Because of this, GTM-standby extension is not included in this function.
*/
void
ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message)
@@ -2150,6 +2516,8 @@ ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message)
/*
* Process MSG_TXN_GET_NEXT_GXID message
+ *
+ * This does not need backup to the standby because no internal state changes.
*/
void
ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message)
diff --git a/src/gtm/main/main.c b/src/gtm/main/main.c
index 7062c31f6a..997f42dd5d 100644
--- a/src/gtm/main/main.c
+++ b/src/gtm/main/main.c
@@ -45,6 +45,7 @@
#include "gtm/gtm_seq.h"
#include "gtm/gtm_msg.h"
#include "gtm/gtm_opt.h"
+#include "gtm/gtm_utils.h"
extern int optind;
extern char *optarg;
@@ -62,15 +63,24 @@ int GTMPortNumber;
char GTMControlFile[GTM_MAX_PATH];
char *GTMDataDir;
char *NodeName;
+bool Backup_synchronously = false;
char *active_addr;
int active_port;
-int keepalives_idle;
-int keepalives_interval;
-int keepalives_count;
+int tcp_keepalives_idle;
+int tcp_keepalives_interval;
+int tcp_keepalives_count;
char *error_reporter;
char *status_reader;
bool isStartUp;
+/* If this is GTM or not */
+/*
+ * Used to determine if given Port is in GTM or in GT_Proxy.
+ * If it is in GTM, we should consider to flush GTM_Conn before
+ * writing anything to Port.
+ */
+bool isGTM = true;
+
GTM_ThreadID TopMostThreadID;
/* The socket(s) we're listening to. */
@@ -104,6 +114,7 @@ static void ChangeToDataDir(void);
static void checkDataDir(void);
static void DeleteLockFile(const char *filename);
static void PromoteToActive(void);
+static void ProcessSyncStandbyCommand(Port *myport, GTM_MessageType mtype, StringInfo message);
/*
* One-time initialization. It's called immediately after the main process
@@ -200,7 +211,7 @@ BaseInit()
static void
GTM_SigleHandler(int signal)
{
- fprintf(stderr, "Received signal %d", signal);
+ fprintf(stderr, "Received signal %d\n", signal);
switch (signal)
{
@@ -538,7 +549,7 @@ main(int argc, char *argv[])
elog(LOG, "Startup connection established with active-GTM.");
}
- elog(DEBUG3, "Starting GTM server at (%s:%d) -- control file %s", ListenAddresses, GTMPortNumber, GTMControlFile);
+ elog(LOG, "Starting GTM server at (%s:%d) -- control file %s", ListenAddresses, GTMPortNumber, GTMControlFile);
/*
* Read the last GXID and start from there
@@ -864,6 +875,7 @@ ServerLoop(void)
GTM_Conn *standby = NULL;
standby = gtm_standby_connect_to_standby();
+
if (GTMAddConnection(port, standby) != STATUS_OK)
{
@@ -1118,6 +1130,14 @@ GTM_ThreadMain(void *argp)
* Flush all the outgoing data on the wire. Consume the message
* type field for sanity
*/
+ /* Sync with standby first */
+ if (thrinfo->thr_conn->standby)
+ {
+ if (Backup_synchronously)
+ gtm_sync_standby(thrinfo->thr_conn->standby);
+ else
+ gtmpqFlush(thrinfo->thr_conn->standby);
+ }
pq_getmsgint(&input_message, sizeof (GTM_MessageType));
pq_getmsgend(&input_message);
pq_flush(thrinfo->thr_conn->con_port);
@@ -1161,10 +1181,23 @@ ProcessCommand(Port *myport, StringInfo input_message)
myport->conn_id = proxyhdr.ph_conid;
mtype = pq_getmsgint(input_message, sizeof (GTM_MessageType));
+ /*
+ * The next line will have some overhead. Better to be in
+ * compile option.
+ */
+#ifdef GTM_DEBUG
+ elog(DEBUG3, "mtype = %s (%d).", gtm_util_message_name(mtype), (int)mtype);
+#endif
+
switch (mtype)
{
+ case MSG_SYNC_STANDBY:
+ ProcessSyncStandbyCommand(myport, mtype, input_message);
+ break;
case MSG_NODE_REGISTER:
+ case MSG_BKUP_NODE_REGISTER:
case MSG_NODE_UNREGISTER:
+ case MSG_BKUP_NODE_UNREGISTER:
case MSG_NODE_LIST:
ProcessPGXCNodeCommand(myport, mtype, input_message);
break;
@@ -1176,17 +1209,29 @@ ProcessCommand(Port *myport, StringInfo input_message)
case MSG_NODE_BEGIN_REPLICATION_INIT:
case MSG_NODE_END_REPLICATION_INIT:
case MSG_TXN_BEGIN:
+ case MSG_BKUP_TXN_BEGIN:
case MSG_TXN_BEGIN_GETGXID:
+ case MSG_BKUP_TXN_BEGIN_GETGXID:
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
+ case MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM:
case MSG_TXN_PREPARE:
+ case MSG_BKUP_TXN_PREPARE:
case MSG_TXN_START_PREPARED:
+ case MSG_BKUP_TXN_START_PREPARED:
case MSG_TXN_COMMIT:
+ case MSG_BKUP_TXN_COMMIT:
case MSG_TXN_COMMIT_PREPARED:
+ case MSG_BKUP_TXN_COMMIT_PREPARED:
case MSG_TXN_ROLLBACK:
+ case MSG_BKUP_TXN_ROLLBACK:
case MSG_TXN_GET_GXID:
+ case MSG_BKUP_TXN_GET_GXID:
case MSG_TXN_BEGIN_GETGXID_MULTI:
+ case MSG_BKUP_TXN_BEGIN_GETGXID_MULTI:
case MSG_TXN_COMMIT_MULTI:
+ case MSG_BKUP_TXN_COMMIT_MULTI:
case MSG_TXN_ROLLBACK_MULTI:
+ case MSG_BKUP_TXN_ROLLBACK_MULTI:
case MSG_TXN_GET_GID_DATA:
case MSG_TXN_GET_NEXT_GXID:
case MSG_TXN_GXID_LIST:
@@ -1200,14 +1245,21 @@ ProcessCommand(Port *myport, StringInfo input_message)
break;
case MSG_SEQUENCE_INIT:
+ case MSG_BKUP_SEQUENCE_INIT:
case MSG_SEQUENCE_GET_CURRENT:
case MSG_SEQUENCE_GET_NEXT:
+ case MSG_BKUP_SEQUENCE_GET_NEXT:
case MSG_SEQUENCE_GET_LAST:
case MSG_SEQUENCE_SET_VAL:
+ case MSG_BKUP_SEQUENCE_SET_VAL:
case MSG_SEQUENCE_RESET:
+ case MSG_BKUP_SEQUENCE_RESET:
case MSG_SEQUENCE_CLOSE:
+ case MSG_BKUP_SEQUENCE_CLOSE:
case MSG_SEQUENCE_RENAME:
+ case MSG_BKUP_SEQUENCE_RENAME:
case MSG_SEQUENCE_ALTER:
+ case MSG_BKUP_SEQUENCE_ALTER:
case MSG_SEQUENCE_LIST:
ProcessSequenceCommand(myport, mtype, input_message);
break;
@@ -1247,7 +1299,7 @@ GTMAddConnection(Port *port, GTM_Conn *standby)
errmsg("Out of memory")));
return STATUS_ERROR;
}
-
+
elog(DEBUG3, "Started new connection");
conninfo->con_port = port;
@@ -1338,17 +1390,46 @@ ReadCommand(Port *myport, StringInfo inBuf)
return qtype;
}
+/*
+ * Process MSG_SYNC_STANDBY message
+ */
+static void
+ProcessSyncStandbyCommand(Port *myport, GTM_MessageType mtype, StringInfo message)
+{
+ StringInfoData buf;
+
+ pq_getmsgend(message);
+
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, SYNC_STANDBY_RESULT, 4);
+ pq_endmessage(myport, &buf);
+ /* Sync standby first */
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+}
+
+
+
static void
ProcessPGXCNodeCommand(Port *myport, GTM_MessageType mtype, StringInfo message)
{
switch (mtype)
{
case MSG_NODE_REGISTER:
- ProcessPGXCNodeRegister(myport, message);
+ ProcessPGXCNodeRegister(myport, message, false);
+ break;
+
+ case MSG_BKUP_NODE_REGISTER:
+ ProcessPGXCNodeRegister(myport, message, true);
break;
case MSG_NODE_UNREGISTER:
- ProcessPGXCNodeUnregister(myport, message);
+ ProcessPGXCNodeUnregister(myport, message, false);
+ break;
+
+ case MSG_BKUP_NODE_UNREGISTER:
+ ProcessPGXCNodeUnregister(myport, message, true);
break;
case MSG_NODE_LIST:
@@ -1379,47 +1460,96 @@ ProcessTransactionCommand(Port *myport, GTM_MessageType mtype, StringInfo messag
ProcessBeginTransactionCommand(myport, message);
break;
+ case MSG_BKUP_TXN_BEGIN:
+ ProcessBkupBeginTransactionCommand(myport, message);
+ break;
+
case MSG_TXN_BEGIN_GETGXID:
ProcessBeginTransactionGetGXIDCommand(myport, message);
break;
+ case MSG_BKUP_TXN_BEGIN_GETGXID:
+ ProcessBkupBeginTransactionGetGXIDCommand(myport, message);
+
case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
ProcessBeginTransactionGetGXIDAutovacuumCommand(myport, message);
break;
+ case MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM:
+ ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(myport, message);
+ break;
+
case MSG_TXN_BEGIN_GETGXID_MULTI:
ProcessBeginTransactionGetGXIDCommandMulti(myport, message);
break;
+ case MSG_BKUP_TXN_BEGIN_GETGXID_MULTI:
+ ProcessBkupBeginTransactionGetGXIDCommandMulti(myport, message);
+ break;
+
case MSG_TXN_START_PREPARED:
- ProcessStartPreparedTransactionCommand(myport, message);
+ ProcessStartPreparedTransactionCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_START_PREPARED:
+ ProcessStartPreparedTransactionCommand(myport, message, true);
break;
case MSG_TXN_PREPARE:
- ProcessPrepareTransactionCommand(myport, message);
+ ProcessPrepareTransactionCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_PREPARE:
+ ProcessPrepareTransactionCommand(myport, message, true);
break;
case MSG_TXN_COMMIT:
- ProcessCommitTransactionCommand(myport, message);
+ ProcessCommitTransactionCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_COMMIT:
+ ProcessCommitTransactionCommand(myport, message, true);
break;
case MSG_TXN_COMMIT_PREPARED:
- ProcessCommitPreparedTransactionCommand(myport, message);
+ ProcessCommitPreparedTransactionCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_COMMIT_PREPARED:
+ ProcessCommitPreparedTransactionCommand(myport, message, true);
break;
case MSG_TXN_ROLLBACK:
- ProcessRollbackTransactionCommand(myport, message);
+ ProcessRollbackTransactionCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_ROLLBACK:
+ ProcessRollbackTransactionCommand(myport, message, true);
break;
case MSG_TXN_COMMIT_MULTI:
- ProcessCommitTransactionCommandMulti(myport, message);
+ ProcessCommitTransactionCommandMulti(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_COMMIT_MULTI:
+ ProcessCommitTransactionCommandMulti(myport, message, true);
break;
case MSG_TXN_ROLLBACK_MULTI:
- ProcessRollbackTransactionCommandMulti(myport, message);
+ ProcessRollbackTransactionCommandMulti(myport, message, false);
+ break;
+
+ case MSG_BKUP_TXN_ROLLBACK_MULTI:
+ ProcessRollbackTransactionCommandMulti(myport, message, true);
break;
case MSG_TXN_GET_GXID:
+ /*
+ * Notice: we don't have corresponding functions in gtm_client.c
+ *
+ * Because this function is not used, GTM-standby extension is not
+ * included in this function.
+ */
ProcessGetGXIDTransactionCommand(myport, message);
break;
@@ -1469,11 +1599,19 @@ ProcessSequenceCommand(Port *myport, GTM_MessageType mtype, StringInfo message)
switch (mtype)
{
case MSG_SEQUENCE_INIT:
- ProcessSequenceInitCommand(myport, message);
+ ProcessSequenceInitCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_INIT:
+ ProcessSequenceInitCommand(myport, message, true);
break;
case MSG_SEQUENCE_ALTER:
- ProcessSequenceAlterCommand(myport, message);
+ ProcessSequenceAlterCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_ALTER:
+ ProcessSequenceAlterCommand(myport, message, true);
break;
case MSG_SEQUENCE_GET_CURRENT:
@@ -1481,23 +1619,43 @@ ProcessSequenceCommand(Port *myport, GTM_MessageType mtype, StringInfo message)
break;
case MSG_SEQUENCE_GET_NEXT:
- ProcessSequenceGetNextCommand(myport, message);
+ ProcessSequenceGetNextCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_GET_NEXT:
+ ProcessSequenceGetNextCommand(myport, message, true);
break;
case MSG_SEQUENCE_SET_VAL:
- ProcessSequenceSetValCommand(myport, message);
+ ProcessSequenceSetValCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_SET_VAL:
+ ProcessSequenceSetValCommand(myport, message, true);
break;
case MSG_SEQUENCE_RESET:
- ProcessSequenceResetCommand(myport, message);
+ ProcessSequenceResetCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_RESET:
+ ProcessSequenceResetCommand(myport, message, true);
break;
case MSG_SEQUENCE_CLOSE:
- ProcessSequenceCloseCommand(myport, message);
+ ProcessSequenceCloseCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_CLOSE:
+ ProcessSequenceCloseCommand(myport, message, true);
break;
case MSG_SEQUENCE_RENAME:
- ProcessSequenceRenameCommand(myport, message);
+ ProcessSequenceRenameCommand(myport, message, false);
+ break;
+
+ case MSG_BKUP_SEQUENCE_RENAME:
+ ProcessSequenceRenameCommand(myport, message, true);
break;
case MSG_SEQUENCE_LIST:
diff --git a/src/gtm/proxy/gtm_proxy_opt.c b/src/gtm/proxy/gtm_proxy_opt.c
index 6fdd470b2f..20c4c492dd 100644
--- a/src/gtm/proxy/gtm_proxy_opt.c
+++ b/src/gtm/proxy/gtm_proxy_opt.c
@@ -44,9 +44,9 @@ extern int GTMPortNumber;
extern char *error_reporter;
extern char *status_reader;
extern int log_min_messages;
-extern int keepalives_idle;
-extern int keepalives_count;
-extern int keepalives_interval;
+extern int tcp_keepalives_idle;
+extern int tcp_keepalives_count;
+extern int tcp_keepalives_interval;
extern char *GTMServerHost;
extern int GTMProxyPortNumber;
extern bool IsGTMConnectRetryRequired;
@@ -54,9 +54,11 @@ extern int GTMConnectRetryIdle;
extern int GTMConnectRetryCount;
extern int GTMConnectRetryInterval;
extern int GTMServerPortNumber;
+/*
extern int GTMServerKeepalivesIdle;
extern int GTMServerKeepalivesInterval;
extern int GTMServerKeepalivesCount;
+*/
extern int GTMErrorWaitIdle;
extern int GTMErrorWaitInterval;
extern int GTMErrorWaitCount;
@@ -222,7 +224,7 @@ struct config_int ConfigureNamesInt[] =
NULL,
GTMOPT_UNIT_TIME
},
- &GTMServerKeepalivesIdle,
+ &tcp_keepalives_idle,
0, 0, INT_MAX,
0, NULL
},
@@ -233,7 +235,7 @@ struct config_int ConfigureNamesInt[] =
NULL,
GTMOPT_UNIT_TIME
},
- &GTMServerKeepalivesInterval,
+ &tcp_keepalives_interval,
0, 0, INT_MAX,
0, NULL
},
@@ -244,7 +246,7 @@ struct config_int ConfigureNamesInt[] =
NULL,
0
},
- &GTMServerKeepalivesCount,
+ &tcp_keepalives_count,
0, 0, INT_MAX,
0, NULL
},
diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c
index 8317d3dc61..67228397c0 100644
--- a/src/gtm/proxy/proxy_main.c
+++ b/src/gtm/proxy/proxy_main.c
@@ -55,7 +55,13 @@ extern char *optarg;
#define GTM_PROXY_DEFAULT_WORKERS 2
#define GTM_PID_FILE "gtm_proxy.pid"
#define GTM_LOG_FILE "gtm_proxy.log"
-#define PROXY_CLIENT_TIMEOUT 20
+#ifndef PROXY_CLIENT_TIMEOUT
+#ifdef GTM_DEBUG
+#define PROXY_CLIENT_TIMEOUT 3600
+#else
+#define PROXY_CLIENT_TIMEOUT 20
+#endif
+#endif
static char *progname = "gtm_proxy";
char *ListenAddresses;
@@ -81,9 +87,9 @@ int GTMConnectRetryInterval = 0;
/*
* Keepalives setup for the connection with GTM server
*/
-int GTMServerKeepalivesIdle = 0;
-int GTMServerKeepalivesInterval = 0;
-int GTMServerKeepalivesCount = 0;
+int tcp_keepalives_idle = 0;
+int tcp_keepalives_interval = 0;
+int tcp_keepalives_count = 0;
char *GTMProxyNodeName = NULL;
GTM_ThreadID TopMostThreadID;
@@ -106,6 +112,14 @@ GTM_RWLock ReconnectControlLock;
jmp_buf mainThreadSIGUSR1_buf;
int SIGUSR1Accepted = FALSE;
+/* If this is GTM or not */
+/*
+ * Used to determine if given Port is in GTM or in GT_Proxy.
+ * If it is in GTM, we should consider to flush GTM_Conn before
+ * writing anything to Port.
+ */
+bool isGTM = false;
+
/* The socket(s) we're listening to. */
#define MAXLISTEN 64
static int ListenSocket[MAXLISTEN];
@@ -345,9 +359,9 @@ GTMProxy_ReadReconnectInfo(void)
return(-1);
}
fclose(optarg_file);
-#ifdef GTM_SBY_DEBUG
+
elog(LOG, "reconnect option = \"%s\"\n", optstr);
-#endif
+
next_token = optstr;
while ((option = read_token(next_token, &next_token)))
{
@@ -389,7 +403,7 @@ GTMProxy_SigleHandler(int signal)
{
int ii;
- elog(LOG, "Received signal %d", signal);
+ elog(LOG, "Received signal %d\n", signal);
switch (signal)
{
@@ -408,14 +422,12 @@ GTMProxy_SigleHandler(int signal)
* The mask is set to block signals. They're blocked until all the
* threads reconnect to the new GTM.
*/
-#ifdef GTM_SBY_DEBUG
elog(LOG, "Accepted SIGUSR1\n");
-#endif
if (MyThreadID != TopMostThreadID)
{
-#ifdef GTM_SBY_DEBUG
+
elog(LOG, "Not on main thread, proxy the signal to the main thread.");
-#endif
+
pthread_kill(TopMostThreadID, SIGUSR1);
return;
}
@@ -423,9 +435,9 @@ GTMProxy_SigleHandler(int signal)
* Then this is the main thread.
*/
PG_SETMASK(&BlockSig);
-#ifdef GTM_SBY_DEBUG
+
elog(LOG, "I'm the main thread. Accepted SIGUSR1.");
-#endif
+
/*
* Set Reconnect Info
*/
@@ -485,15 +497,15 @@ GTMProxy_SigleHandler(int signal)
case SIGUSR2: /* Reconnect from the main thread */
/* Main thread has nothing to do twith this signal and should not receive this. */
PG_SETMASK(&BlockSig);
-#ifdef GTM_SBY_DEBUG
+
elog(LOG, "Detected SIGUSR2, thread:%ld", MyThreadID);
-#endif
+
if (MyThreadID == TopMostThreadID)
{
/* This should not be reached. Just in case. */
-#ifdef GTM_SBY_DEBUG
+
elog(LOG, "SIGUSR2 received by the main thread. Ignoring.");
-#endif
+
PG_SETMASK(&UnBlockSig);
return;
}
@@ -792,7 +804,7 @@ main(int argc, char *argv[])
*/
BaseInit();
- elog(DEBUG3, "Starting GTM proxy at (%s:%d)", ListenAddresses, GTMProxyPortNumber);
+ elog(LOG, "Starting GTM proxy at (%s:%d)", ListenAddresses, GTMProxyPortNumber);
/* Recover Data of Registered nodes. */
Recovery_RestoreRegisterInfo();
@@ -3258,10 +3270,16 @@ RegisterProxy(bool is_reconnect)
finish_time = time(NULL) + PROXY_CLIENT_TIMEOUT;
if (gtmpqWaitTimed(true, false, master_conn, finish_time) ||
gtmpqReadData(master_conn) < 0)
+ {
+ elog(ERROR, "Cannot read data.");
goto failed;
+ }
if ((res = GTMPQgetResult(master_conn)) == NULL)
+ {
+ elog(ERROR, "Cannot get result.");
goto failed;
+ }
if (res->gr_status == GTM_RESULT_OK)
{
diff --git a/src/gtm/recovery/register_gtm.c b/src/gtm/recovery/register_gtm.c
index 7f9b1543e4..9ca0e20a6d 100644
--- a/src/gtm/recovery/register_gtm.c
+++ b/src/gtm/recovery/register_gtm.c
@@ -33,12 +33,15 @@
#include "gtm/gtm_ip.h"
static void finishStandbyConn(GTM_ThreadInfo *thrinfo);
+extern bool Backup_synchronously;
/*
- * Process MSG_NODE_REGISTER
+ * Process MSG_NODE_REGISTER/MSG_BKUP_NODE_REGISTER message.
+ *
+ * is_backup indicates the message is MSG_BKUP_NODE_REGISTER.
*/
void
-ProcessPGXCNodeRegister(Port *myport, StringInfo message)
+ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup)
{
GTM_PGXCNodeType type;
GTM_PGXCNodePort port;
@@ -172,64 +175,79 @@ ProcessPGXCNodeRegister(Port *myport, StringInfo message)
pq_getmsgend(message);
- /*
- * Send a SUCCESS message back to the client
- */
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, NODE_REGISTER_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ if (!is_backup)
{
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType));
- /* Node name length */
- pq_sendint(&buf, strlen(node_name), 4);
- /* Node name (var-len) */
- pq_sendbytes(&buf, node_name, strlen(node_name));
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if (GetMyThreadInfo->thr_conn->standby)
- {
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
- GTM_PGXCNodeInfo *standbynode;
-
- elog(LOG, "calling node_register_internal() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
-
-retry:
- _rc = node_register_internal(GetMyThreadInfo->thr_conn->standby,
- type,
- ipaddress,
- port,
- node_name,
- datafolder,
- status);
-
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
-
- /* Now check if there're other standby registered. */
- standbynode = find_standby_node_info();
- if (!standbynode)
- GTMThreads->gt_standby_ready = false;
-
- elog(LOG, "node_register_internal() returns rc %d.", _rc);
+ /*
+ * Backup first
+ */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
+ GTM_PGXCNodeInfo *standbynode;
+
+ elog(LOG, "calling node_register_internal() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
+
+ retry:
+ _rc = bkup_node_register_internal(GetMyThreadInfo->thr_conn->standby,
+ type,
+ ipaddress,
+ port,
+ node_name,
+ datafolder,
+ status);
+
+ elog(LOG, "node_register_internal() returns rc %d.", _rc);
+
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+
+ /* Now check if there're other standby registered. */
+ standbynode = find_standby_node_info();
+ if (!standbynode)
+ GTMThreads->gt_standby_ready = false;
+
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
+
+ }
+ /*
+ * Then, send a SUCCESS message back to the client
+ */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, NODE_REGISTER_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType));
+ /* Node name length */
+ pq_sendint(&buf, strlen(node_name), 4);
+ /* Node name (var-len) */
+ pq_sendbytes(&buf, node_name, strlen(node_name));
+ pq_endmessage(myport, &buf);
+
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
}
}
/*
- * Process MSG_NODE_UNREGISTER
+ * Process MSG_NODE_UNREGISTER/MSG_BKUP_NODE_UNREGISTER
+ *
+ * is_backup indiccates MSG_BKUP_NODE_UNREGISTER
*/
void
-ProcessPGXCNodeUnregister(Port *myport, StringInfo message)
+ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup)
{
GTM_PGXCNodeType type;
MemoryContext oldContext;
@@ -268,47 +286,61 @@ ProcessPGXCNodeUnregister(Port *myport, StringInfo message)
pq_getmsgend(message);
- /*
- * Send a SUCCESS message back to the client
- */
- pq_beginmessage(&buf, 'S');
- pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4);
- if (myport->remote_type == GTM_NODE_GTM_PROXY)
- {
- GTM_ProxyMsgHeader proxyhdr;
- proxyhdr.ph_conid = myport->conn_id;
- pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
- }
- pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType));
- /* Node name length */
- pq_sendint(&buf, strlen(node_name), 4);
- /* Node name (var-len) */
- pq_sendbytes(&buf, node_name, strlen(node_name));
- pq_endmessage(myport, &buf);
-
- if (myport->remote_type != GTM_NODE_GTM_PROXY)
- pq_flush(myport);
-
- if (GetMyThreadInfo->thr_conn->standby)
+ if (!is_backup)
{
- int _rc;
- GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
- int count = 0;
-
- elog(LOG, "calling node_unregister() for standby GTM %p.",
- GetMyThreadInfo->thr_conn->standby);
+ /*
+ * Backup first
+ */
+ if (GetMyThreadInfo->thr_conn->standby)
+ {
+ int _rc;
+ GTM_Conn *oldconn = GetMyThreadInfo->thr_conn->standby;
+ int count = 0;
+
+ elog(LOG, "calling node_unregister() for standby GTM %p.",
+ GetMyThreadInfo->thr_conn->standby);
-retry:
- _rc = node_unregister(GetMyThreadInfo->thr_conn->standby,
- type,
- node_name);
+ retry:
+ _rc = bkup_node_unregister(GetMyThreadInfo->thr_conn->standby,
+ type,
+ node_name);
- if (gtm_standby_check_communication_error(&count, oldconn))
- goto retry;
+ if (gtm_standby_check_communication_error(&count, oldconn))
+ goto retry;
+
+ if (Backup_synchronously && (myport->remote_type != GTM_NODE_GTM_PROXY))
+ gtm_sync_standby(GetMyThreadInfo->thr_conn->standby);
- elog(LOG, "node_unregister() returns rc %d.", _rc);
+ elog(LOG, "node_unregister() returns rc %d.", _rc);
+ }
+ /*
+ * Send a SUCCESS message back to the client
+ */
+ pq_beginmessage(&buf, 'S');
+ pq_sendint(&buf, NODE_UNREGISTER_RESULT, 4);
+ if (myport->remote_type == GTM_NODE_GTM_PROXY)
+ {
+ GTM_ProxyMsgHeader proxyhdr;
+ proxyhdr.ph_conid = myport->conn_id;
+ pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
+ }
+ pq_sendbytes(&buf, (char *)&type, sizeof(GTM_PGXCNodeType));
+ /* Node name length */
+ pq_sendint(&buf, strlen(node_name), 4);
+ /* Node name (var-len) */
+ pq_sendbytes(&buf, node_name, strlen(node_name));
+
+ pq_endmessage(myport, &buf);
+
+ /* Flush standby before flush to the client */
+ if (myport->remote_type != GTM_NODE_GTM_PROXY)
+ {
+ if (GetMyThreadInfo->thr_conn->standby)
+ gtmpqFlush(GetMyThreadInfo->thr_conn->standby);
+ pq_flush(myport);
+ }
}
}
diff --git a/src/gtm/recovery/replication.c b/src/gtm/recovery/replication.c
index c8005652d2..ae351ff26a 100644
--- a/src/gtm/recovery/replication.c
+++ b/src/gtm/recovery/replication.c
@@ -67,6 +67,10 @@ ProcessBeginReplicationInitialSyncRequest(Port *myport, StringInfo message)
}
pq_endmessage(myport, &buf);
+ /*
+ * Beause this command comes from the standby, we don't have to flush
+ * messages to the standby here.
+ */
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
@@ -112,6 +116,10 @@ ProcessEndReplicationInitialSyncRequest(Port *myport, StringInfo message)
}
pq_endmessage(myport, &buf);
+ /*
+ * Beause this command comes from the standby, we don't have to flush
+ * messages to the standby here.
+ */
if (myport->remote_type != GTM_NODE_GTM_PROXY)
pq_flush(myport);
diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h
index 971277816e..f619b37acc 100644
--- a/src/include/gtm/gtm_client.h
+++ b/src/include/gtm/gtm_client.h
@@ -166,13 +166,26 @@ size_t get_sequence_list(GTM_Conn *, GTM_SeqInfo **, size_t);
* Transaction Management API
*/
GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp);
+int bkup_begin_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, GTM_IsolationLevel isolevel,
+ bool read_only, GTM_Timestamp timestamp);
+int bkup_begin_transaction_gxid(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+ GTM_IsolationLevel isolevel, bool read_only, GTM_Timestamp timestamp);
+
GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLevel isolevel);
+int bkup_begin_transaction_autovacuum(GTM_Conn *conn, GTM_TransactionHandle txn, GlobalTransactionId gxid,
+ GTM_IsolationLevel isolevel);
int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
+int bkup_commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid);
+int bkup_commit_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, GlobalTransactionId prepared_gxid);
int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
+int bkup_abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int start_prepared_transaction(GTM_Conn *conn, GlobalTransactionId gxid, char *gid,
char *nodestring);
+int backup_start_prepared_transaction(GTM_Conn *conn, GTM_TransactionHandle txn, char *gid,
+ char *nodestring);
int prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
+int bkup_prepare_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int get_gid_data(GTM_Conn *conn, GTM_IsolationLevel isolevel, char *gid,
GlobalTransactionId *gxid, GlobalTransactionId *prepared_gxid,
char **nodestring);
@@ -185,12 +198,20 @@ begin_transaction_multi(GTM_Conn *conn, int txn_count, GTM_IsolationLevel *txn_i
bool *txn_read_only, GTMProxy_ConnID *txn_connid,
int *txn_count_out, GlobalTransactionId *gxid_out, GTM_Timestamp *ts_out);
int
+bkup_begin_transaction_multi(GTM_Conn *conn, int txn_count,
+ GTM_TransactionHandle *txn, GlobalTransactionId start_gxid, GTM_IsolationLevel *isolevel,
+ bool *read_only, GTMProxy_ConnID *txn_connid);
+int
commit_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
- int *txn_count_out, int *status_out);
+ int *txn_count_out, int *status_out);
+int
+bkup_commit_transaction_multi(GTM_Conn *conn, int txn_count, GTM_TransactionHandle *txn);
int
abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
int *txn_count_out, int *status_out);
int
+bkup_abort_transaction_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid);
+int
snapshot_get_multi(GTM_Conn *conn, int txn_count, GlobalTransactionId *gxid,
int *txn_count_out, int *status_out,
GlobalTransactionId *xmin_out, GlobalTransactionId *xmax_out,
@@ -206,14 +227,24 @@ GTM_SnapshotData *get_snapshot(GTM_Conn *conn, GlobalTransactionId gxid,
* Node Registering management API
*/
int node_register(GTM_Conn *conn,
- GTM_PGXCNodeType type,
- GTM_PGXCNodePort port,
- char *node_name,
- char *datafolder);
-int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host,
- GTM_PGXCNodePort port, char *node_name, char *datafolder,
- GTM_PGXCNodeStatus status);
+ GTM_PGXCNodeType type,
+ GTM_PGXCNodePort port,
+ char *node_name,
+ char *datafolder);
+int bkup_node_register(GTM_Conn *conn,
+ GTM_PGXCNodeType type,
+ GTM_PGXCNodePort port,
+ char *node_name,
+ char *datafolder);
+int node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder);
+int node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port, char *node_name,
+ char *datafolder, GTM_PGXCNodeStatus status);
+int bkup_node_register(GTM_Conn *conn, GTM_PGXCNodeType type, GTM_PGXCNodePort port, char *node_name, char *datafolder);
+int bkup_node_register_internal(GTM_Conn *conn, GTM_PGXCNodeType type, const char *host, GTM_PGXCNodePort port,
+ char *node_name, char *datafolder, GTM_PGXCNodeStatus status);
+
int node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char *node_name);
+int bkup_node_unregister(GTM_Conn *conn, GTM_PGXCNodeType type, const char * node_name);
int backend_disconnect(GTM_Conn *conn, bool is_postmaster, GTM_PGXCNodeType type, char *node_name);
char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc);
@@ -223,20 +254,32 @@ char *node_get_local_addr(GTM_Conn *conn, char *buf, size_t buflen, int *rc);
int open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
GTM_Sequence minval, GTM_Sequence maxval,
GTM_Sequence startval, bool cycle);
+int bkup_open_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, bool cycle);
int alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
GTM_Sequence minval, GTM_Sequence maxval,
GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart);
+int bkup_alter_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence increment,
+ GTM_Sequence minval, GTM_Sequence maxval,
+ GTM_Sequence startval, GTM_Sequence lastval, bool cycle, bool is_restart);
int close_sequence(GTM_Conn *conn, GTM_SequenceKey key);
+int bkup_close_sequence(GTM_Conn *conn, GTM_SequenceKey key);
int rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey);
+int bkup_rename_sequence(GTM_Conn *conn, GTM_SequenceKey key, GTM_SequenceKey newkey);
GTM_Sequence get_current(GTM_Conn *conn, GTM_SequenceKey key);
GTM_Sequence get_next(GTM_Conn *conn, GTM_SequenceKey key);
+GTM_Sequence bkup_get_next(GTM_Conn *conn, GTM_SequenceKey key);
int set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool is_called);
+int bkup_set_val(GTM_Conn *conn, GTM_SequenceKey key, GTM_Sequence nextval, bool is_called);
int reset_sequence(GTM_Conn *conn, GTM_SequenceKey key);
+int bkup_reset_sequence(GTM_Conn *conn, GTM_SequenceKey key);
/*
* GTM-Standby
*/
int set_begin_end_backup(GTM_Conn *conn, bool begin);
+int gtm_sync_standby(GTM_Conn *conn);
#endif
diff --git a/src/include/gtm/gtm_msg.h b/src/include/gtm/gtm_msg.h
index c7c1259eba..b22a709b0f 100644
--- a/src/include/gtm/gtm_msg.h
+++ b/src/include/gtm/gtm_msg.h
@@ -14,47 +14,74 @@
#ifndef GTM_MSG_H
#define GTM_MSG_H
+/*
+ * The following enum symbols are also used in message_name_tab structure
+ * in gtm_utils.c. Modification of the following enum should reflect
+ * changes to message_name_tab structure as well.
+ */
typedef enum GTM_MessageType
{
MSG_TYPE_INVALID,
- MSG_NODE_REGISTER, /* Register a PGXC Node with GTM */
- MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */
- MSG_NODE_LIST,
+ MSG_SYNC_STANDBY, /* Message to sync woth GTM-Standby */
+ MSG_NODE_REGISTER, /* Register a PGXC Node with GTM */
+ MSG_BKUP_NODE_REGISTER, /* Backup of MSG_NODE_REGISTER */
+ MSG_NODE_UNREGISTER, /* Unregister a PGXC Node with GTM */
+ MSG_BKUP_NODE_UNREGISTER, /* Backup of MSG_NODE_UNREGISTER */
+ MSG_NODE_LIST, /* Get node list */
MSG_NODE_BEGIN_REPLICATION_INIT,
MSG_NODE_END_REPLICATION_INIT,
- MSG_BEGIN_BACKUP, /* Start backup by Standby */
- MSG_END_BACKUP, /* End backup preparation by Standby */
- MSG_TXN_BEGIN, /* Start a new transaction */
- MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */
- MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */
- MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */
+ MSG_BEGIN_BACKUP, /* Start backup by Standby */
+ MSG_END_BACKUP, /* End backup preparation by Standby */
+ MSG_TXN_BEGIN, /* Start a new transaction */
+ MSG_BKUP_TXN_BEGIN, /* Backup of MSG_TXN_BEGIN */
+ MSG_TXN_BEGIN_GETGXID, /* Start a new transaction and get GXID */
+ MSG_BKUP_TXN_BEGIN_GETGXID, /* Backup of MSG_TXN_BEGIN_GETGXID */
+ MSG_TXN_BEGIN_GETGXID_MULTI, /* Start multiple new transactions and get GXIDs */
+ MSG_BKUP_TXN_BEGIN_GETGXID_MULTI, /* Backup of MSG_TXN_BEGIN_GETGXID_MULTI */
+ MSG_TXN_START_PREPARED, /* Begins to prepare a transation for commit */
+ MSG_BKUP_TXN_START_PREPARED, /* Backup of MSG_TXN_START_PREPARED */
MSG_TXN_COMMIT, /* Commit a running or prepared transaction */
- MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */
- MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */
+ MSG_BKUP_TXN_COMMIT, /* Backup of MSG_TXN_COMMIT */
+ MSG_TXN_COMMIT_MULTI, /* Commit multiple running or prepared transactions */
+ MSG_BKUP_TXN_COMMIT_MULTI, /* Bacukp of MSG_TXN_COMMIT_MULTI */
+ MSG_TXN_COMMIT_PREPARED, /* Commit a prepared transaction */
+ MSG_BKUP_TXN_COMMIT_PREPARED, /* Backup of MSG_TXN_COMMIT_PREPARED */
MSG_TXN_PREPARE, /* Finish preparing a transaction */
+ MSG_BKUP_TXN_PREPARE, /* Backup of MSG_TXN_PREPARE */
MSG_TXN_ROLLBACK, /* Rollback a transaction */
- MSG_TXN_ROLLBACK_MULTI, /* Rollback multiple transactions */
+ MSG_BKUP_TXN_ROLLBACK, /* Backup of MSG_TXN_ROLLBACK */
+ MSG_TXN_ROLLBACK_MULTI, /* Rollback multiple transactions */
+ MSG_BKUP_TXN_ROLLBACK_MULTI, /* Backup of MSG_TXN_ROLLBACK_MULTI */
MSG_TXN_GET_GID_DATA, /* Get info associated with a GID, and get a GXID */
MSG_TXN_GET_GXID, /* Get a GXID for a transaction */
- MSG_TXN_GET_NEXT_GXID, /* Get next GXID */
+ MSG_BKUP_TXN_GET_GXID,
+ MSG_TXN_GET_NEXT_GXID, /* Get next GXID */
MSG_TXN_GXID_LIST,
MSG_SNAPSHOT_GET, /* Get a global snapshot */
MSG_SNAPSHOT_GET_MULTI, /* Get multiple global snapshots */
MSG_SNAPSHOT_GXID_GET, /* Get GXID and snapshot together */
MSG_SEQUENCE_INIT, /* Initialize a new global sequence */
+ MSG_BKUP_SEQUENCE_INIT, /* Backup of MSG_SEQUENCE_INIT */
MSG_SEQUENCE_GET_CURRENT,/* Get the current value of sequence */
- MSG_SEQUENCE_GET_NEXT, /* Get the next sequence value of sequence */
+ MSG_SEQUENCE_GET_NEXT, /* Get the next sequence value of sequence */
+ MSG_BKUP_SEQUENCE_GET_NEXT, /* Backup of MSG_SEQUENCE_GET_NEXT */
MSG_SEQUENCE_GET_LAST, /* Get the last sequence value of sequence */
- MSG_SEQUENCE_SET_VAL, /* Set values for sequence */
- MSG_SEQUENCE_RESET, /* Reset the sequence */
- MSG_SEQUENCE_CLOSE, /* Close a previously inited sequence */
- MSG_SEQUENCE_RENAME, /* Rename a sequence */
- MSG_SEQUENCE_ALTER, /* Alter a sequence */
+ MSG_SEQUENCE_SET_VAL, /* Set values for sequence */
+ MSG_BKUP_SEQUENCE_SET_VAL, /* Backup of MSG_SEQUENCE_SET_VAL */
+ MSG_SEQUENCE_RESET, /* Reset the sequence */
+ MSG_BKUP_SEQUENCE_RESET, /* Backup of MSG_SEQUENCE_RESET */
+ MSG_SEQUENCE_CLOSE, /* Close a previously inited sequence */
+ MSG_BKUP_SEQUENCE_CLOSE, /* Backup of MSG_SEQUENCE_CLOSE */
+ MSG_SEQUENCE_RENAME, /* Rename a sequence */
+ MSG_BKUP_SEQUENCE_RENAME, /* Backup of MSG_SEQUENCE_RENAME */
+ MSG_SEQUENCE_ALTER, /* Alter a sequence */
+ MSG_BKUP_SEQUENCE_ALTER, /* Backup of MSG_SEQUENCE_ALTER */
MSG_SEQUENCE_LIST, /* Get a list of sequences */
MSG_TXN_GET_STATUS, /* Get status of a given transaction */
MSG_TXN_GET_ALL_PREPARED, /* Get information about all outstanding
* prepared transactions */
- MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Start a new transaction and get GXID for autovacuum */
+ MSG_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Start a new transaction and get GXID for autovacuum */
+ MSG_BKUP_TXN_BEGIN_GETGXID_AUTOVACUUM, /* Backup of MSG_TXN_BEGIN_GETGXID_AUTOVACUUM */
MSG_DATA_FLUSH, /* flush pending data */
MSG_BACKEND_DISCONNECT, /* tell GTM that the backend diconnected from the proxy */
@@ -64,8 +91,13 @@ typedef enum GTM_MessageType
MSG_TYPE_COUNT /* A dummmy entry just to count the message types */
} GTM_MessageType;
+/*
+ * Symbols in the following enum are usd in result_name_tab defined in gtm_utils.c.
+ * Modifictaion to the following enum should be reflected to result_name_tab as well.
+ */
typedef enum GTM_ResultType
{
+ SYNC_STANDBY_RESULT,
NODE_REGISTER_RESULT,
NODE_UNREGISTER_RESULT,
NODE_LIST_RESULT,
@@ -103,6 +135,7 @@ typedef enum GTM_ResultType
TXN_GET_STATUS_RESULT,
TXN_GET_ALL_PREPARED_RESULT,
TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT,
+ RESULT_TYPE_COUNT
} GTM_ResultType;
/*
diff --git a/src/include/gtm/gtm_opt.h b/src/include/gtm/gtm_opt.h
index fd8a3101d8..b475d34e1d 100644
--- a/src/include/gtm/gtm_opt.h
+++ b/src/include/gtm/gtm_opt.h
@@ -351,6 +351,7 @@ const char *const config_type_names[] =\
#define GTM_OPTNAME_PORT "port"
#define GTM_OPTNAME_STARTUP "startup"
#define GTM_OPTNAME_STATUS_READER "status_reader"
+#define GTM_OPTNAME_SYNCHRONOUS_BACKUP "synchronous_backup"
#define GTM_OPTNAME_WORKER_THREADS "worker_threads"
diff --git a/src/include/gtm/gtm_seq.h b/src/include/gtm/gtm_seq.h
index 92174eecc5..77478eab47 100644
--- a/src/include/gtm/gtm_seq.h
+++ b/src/include/gtm/gtm_seq.h
@@ -76,14 +76,14 @@ int GTM_SeqSetVal(GTM_SequenceKey seqkey, GTM_Sequence nextval, bool iscalled);
int GTM_SeqReset(GTM_SequenceKey seqkey);
-void ProcessSequenceInitCommand(Port *myport, StringInfo message);
+void ProcessSequenceInitCommand(Port *myport, StringInfo message, bool is_backup);
void ProcessSequenceGetCurrentCommand(Port *myport, StringInfo message);
-void ProcessSequenceGetNextCommand(Port *myport, StringInfo message);
-void ProcessSequenceSetValCommand(Port *myport, StringInfo message);
-void ProcessSequenceResetCommand(Port *myport, StringInfo message);
-void ProcessSequenceCloseCommand(Port *myport, StringInfo message);
-void ProcessSequenceRenameCommand(Port *myport, StringInfo message);
-void ProcessSequenceAlterCommand(Port *myport, StringInfo message);
+void ProcessSequenceGetNextCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessSequenceSetValCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessSequenceResetCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessSequenceCloseCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessSequenceRenameCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessSequenceAlterCommand(Port *myport, StringInfo message, bool is_backup);
void ProcessSequenceListCommand(Port *myport, StringInfo message);
diff --git a/src/include/gtm/gtm_txn.h b/src/include/gtm/gtm_txn.h
index 6cd5181a96..dae4cc143f 100644
--- a/src/include/gtm/gtm_txn.h
+++ b/src/include/gtm/gtm_txn.h
@@ -220,25 +220,42 @@ GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[],
void GTM_FreeCachedTransInfo(void);
void ProcessBeginTransactionCommand(Port *myport, StringInfo message);
+void ProcessBkupBeginTransactionCommand(Port *myport, StringInfo message);
+void GTM_BkupBeginTransactionMulti(char *coord_name,
+ GTM_TransactionHandle *txn,
+ GTM_IsolationLevel *isolevel,
+ bool *readonly,
+ GTMProxy_ConnID *connid,
+ int txn_count);
+
void ProcessBeginTransactionCommandMulti(Port *myport, StringInfo message);
void ProcessBeginTransactionGetGXIDCommand(Port *myport, StringInfo message);
-void ProcessCommitTransactionCommand(Port *myport, StringInfo message);
-void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message);
-void ProcessRollbackTransactionCommand(Port *myport, StringInfo message);
-void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message);
-void ProcessPrepareTransactionCommand(Port *myport, StringInfo message);
+void ProcessCommitTransactionCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessCommitPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessRollbackTransactionCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessStartPreparedTransactionCommand(Port *myport, StringInfo message, bool is_backup);
+void ProcessPrepareTransactionCommand(Port *myport, StringInfo message, bool is_backup);
void ProcessGetGIDDataTransactionCommand(Port *myport, StringInfo message);
void ProcessGetGXIDTransactionCommand(Port *myport, StringInfo message);
void ProcessGXIDListCommand(Port *myport, StringInfo message);
void ProcessGetNextGXIDTransactionCommand(Port *myport, StringInfo message);
void ProcessBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message);
+void ProcessBkupBeginTransactionGetGXIDAutovacuumCommand(Port *myport, StringInfo message);
+
void ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message);
-void ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message);
-void ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message) ;
+void ProcessCommitTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup);
+void ProcessRollbackTransactionCommandMulti(Port *myport, StringInfo message, bool is_backup) ;
void GTM_SaveTxnInfo(int ctlfd);
void GTM_RestoreTxnInfo(int ctlfd, GlobalTransactionId next_gxid);
+void GTM_BkupBeginTransaction(char *coord_name,
+ GTM_TransactionHandle txn,
+ GTM_IsolationLevel isolevel,
+ bool readonly);
+void ProcessBkupBeginTransactionGetGXIDCommand(Port *myport, StringInfo message);
+void ProcessBkupBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message);
+
/*
* In gtm_snap.c
diff --git a/src/include/gtm/gtm_utils.h b/src/include/gtm/gtm_utils.h
index 245daa9114..bea9fb38fa 100644
--- a/src/include/gtm/gtm_utils.h
+++ b/src/include/gtm/gtm_utils.h
@@ -15,6 +15,7 @@
#define GTM_UTILS_H
#include "gtm/libpq-int.h"
+#include "gtm/gtm_msg.h"
#if 0
/*
@@ -25,4 +26,8 @@
void gtm_report_failure(GTM_Conn *);
#endif
+void gtm_util_init_nametabs(void);
+char *gtm_util_message_name(GTM_MessageType type);
+char *gtm_util_result_name(GTM_ResultType type);
+
#endif /* GTM_UTIL_H */
diff --git a/src/include/gtm/register.h b/src/include/gtm/register.h
index 15674ec59a..e8aa8c1bf0 100644
--- a/src/include/gtm/register.h
+++ b/src/include/gtm/register.h
@@ -79,8 +79,8 @@ void Recovery_SaveRegisterInfo(void);
void Recovery_PGXCNodeDisconnect(Port *myport);
void Recovery_SaveRegisterFileName(char *dir);
-void ProcessPGXCNodeRegister(Port *myport, StringInfo message);
-void ProcessPGXCNodeUnregister(Port *myport, StringInfo message);
+void ProcessPGXCNodeRegister(Port *myport, StringInfo message, bool is_backup);
+void ProcessPGXCNodeUnregister(Port *myport, StringInfo message, bool is_backup);
void ProcessPGXCNodeBackendDisconnect(Port *myport, StringInfo message);
void ProcessPGXCNodeList(Port *myport, StringInfo message);