summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/gtm.c6
-rw-r--r--src/backend/access/transam/varsup.c22
-rw-r--r--src/backend/access/transam/xact.c117
-rw-r--r--src/backend/pgxc/pool/datanode.c42
-rw-r--r--src/backend/pgxc/pool/execRemote.c58
-rw-r--r--src/backend/tcop/postgres.c18
-rw-r--r--src/backend/utils/adt/timestamp.c4
-rw-r--r--src/gtm/client/fe-protocol.c16
-rw-r--r--src/gtm/client/gtm_client.c9
-rw-r--r--src/gtm/main/Makefile2
-rw-r--r--src/gtm/main/gtm_time.c41
-rw-r--r--src/gtm/main/gtm_txn.c12
-rw-r--r--src/gtm/proxy/proxy_main.c5
-rw-r--r--src/include/access/gtm.h2
-rw-r--r--src/include/access/transam.h8
-rw-r--r--src/include/access/xact.h4
-rw-r--r--src/include/gtm/gtm_c.h6
-rw-r--r--src/include/gtm/gtm_client.h13
-rw-r--r--src/include/gtm/gtm_time.h37
-rw-r--r--src/include/pgxc/datanode.h2
-rw-r--r--src/include/utils/timestamp.h13
21 files changed, 410 insertions, 27 deletions
diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c
index f9499c9e86..c7f35478d9 100644
--- a/src/backend/access/transam/gtm.c
+++ b/src/backend/access/transam/gtm.c
@@ -67,14 +67,14 @@ CloseGTM(void)
}
GlobalTransactionId
-BeginTranGTM(void)
+BeginTranGTM(GTM_Timestamp *timestamp)
{
GlobalTransactionId xid = InvalidGlobalTransactionId;
CheckConnection();
// TODO Isolation level
if (conn)
- xid = begin_transaction(conn, GTM_ISOLATION_RC);
+ xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp);
/* If something went wrong (timeout), try and reset GTM connection
* and retry. This is safe at the beginning of a transaction.
@@ -84,7 +84,7 @@ BeginTranGTM(void)
CloseGTM();
InitGTM();
if (conn)
- xid = begin_transaction(conn, GTM_ISOLATION_RC);
+ xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp);
}
return xid;
}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 4de1080544..8145fac713 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -75,11 +75,17 @@ GetForceXidFromGTM(void)
* The new XID is also stored into MyProc before returning.
*/
TransactionId
+#ifdef PGXC
+GetNewTransactionId(bool isSubXact, bool *timestamp_received, GTM_Timestamp *timestamp)
+#else
GetNewTransactionId(bool isSubXact)
+#endif
{
TransactionId xid;
-#ifdef PGXC
+#ifdef PGXC
bool increment_xid = true;
+
+ *timestamp_received = false;
#endif
/*
@@ -102,8 +108,10 @@ GetNewTransactionId(bool isSubXact)
* This will help with GTM connection issues- we will not
* block all other processes.
*/
- xid = (TransactionId) BeginTranGTM();
+ xid = (TransactionId) BeginTranGTM(timestamp);
+ *timestamp_received = true;
}
+
#endif
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
@@ -144,18 +152,20 @@ GetNewTransactionId(bool isSubXact)
* exclude it from other snapshots.
*/
next_xid = (TransactionId) BeginTranAutovacuumGTM();
- } else {
+ }
+ else
+ {
elog (DEBUG1, "Getting XID for autovacuum worker (analyze)");
/* try and get gxid directly from GTM */
- next_xid = (TransactionId) BeginTranGTM();
+ next_xid = (TransactionId) BeginTranGTM(NULL);
}
} else if (GetForceXidFromGTM())
{
elog (DEBUG1, "Force get XID from GTM");
/* try and get gxid directly from GTM */
- next_xid = (TransactionId) BeginTranGTM();
+ next_xid = (TransactionId) BeginTranGTM(NULL);
}
-
+
if (TransactionIdIsValid(next_xid))
{
xid = next_xid;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4db1e2c864..6c5ee4a400 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -208,6 +208,19 @@ static TimestampTz stmtStartTimestamp;
static TimestampTz xactStopTimestamp;
/*
+ * PGXC receives from GTM a timestamp value at the same time as a GXID
+ * This one is set as GTMxactStartTimestamp and is a return value of now(), current_transaction().
+ * GTMxactStartTimestamp is also sent to each node with gxid and snapshot and delta is calculated locally.
+ * GTMdeltaTimestamp is used to calculate current_statement as its value can change
+ * during a transaction. Delta can have a different value through the nodes of the cluster
+ * but its uniqueness in the cluster is maintained thanks to the global value GTMxactStartTimestamp.
+ */
+#ifdef PGXC
+static TimestampTz GTMxactStartTimestamp = 0;
+static TimestampTz GTMdeltaTimestamp = 0;
+#endif
+
+/*
* GID to be used for preparing the current transaction. This is also
* global to a whole transaction, so we don't keep it in the state stack.
*/
@@ -315,12 +328,28 @@ GetCurrentGlobalTransactionId(void)
*
* This will return the GXID of the specified transaction,
* getting one from the GTM if it's not yet set.
+ * It also returns a timestamp value if a GXID has been taken from GTM
*/
static GlobalTransactionId
GetGlobalTransactionId(TransactionState s)
{
+ GTM_Timestamp gtm_timestamp;
+ bool received_tp;
+
+ /*
+ * Here we receive timestamp at the same time as gxid.
+ */
if (!GlobalTransactionIdIsValid(s->globalTransactionId))
- s->globalTransactionId = (GlobalTransactionId) GetNewTransactionId(s->parent != NULL);
+ s->globalTransactionId = (GlobalTransactionId) GetNewTransactionId(s->parent != NULL,
+ &received_tp,
+ &gtm_timestamp);
+
+ /* Set a timestamp value if and only if it has been received from GTM */
+ if (received_tp)
+ {
+ GTMxactStartTimestamp = (TimestampTz) gtm_timestamp;
+ GTMdeltaTimestamp = GTMxactStartTimestamp - stmtStartTimestamp;
+ }
return s->globalTransactionId;
}
@@ -473,8 +502,20 @@ AssignTransactionId(TransactionState s)
s->transactionId, isSubXact ? "true" : "false");
}
else
-#endif
+ {
+ GTM_Timestamp gtm_timestamp;
+ bool received_tp;
+
+ s->transactionId = GetNewTransactionId(isSubXact, &received_tp, &gtm_timestamp);
+ if (received_tp)
+ {
+ GTMxactStartTimestamp = (TimestampTz) gtm_timestamp;
+ GTMdeltaTimestamp = GTMxactStartTimestamp - stmtStartTimestamp;
+ }
+ }
+#else
s->transactionId = GetNewTransactionId(isSubXact);
+#endif
if (isSubXact)
SubTransSetParent(s->transactionId, s->parent->transactionId);
@@ -536,7 +577,15 @@ GetCurrentCommandId(bool used)
TimestampTz
GetCurrentTransactionStartTimestamp(void)
{
+ /*
+ * In Postgres-XC, Transaction start timestamp is the value received
+ * from GTM along with GXID.
+ */
+#ifdef PGXC
+ return GTMxactStartTimestamp;
+#else
return xactStartTimestamp;
+#endif
}
/*
@@ -545,7 +594,17 @@ GetCurrentTransactionStartTimestamp(void)
TimestampTz
GetCurrentStatementStartTimestamp(void)
{
+ /*
+ * For Postgres-XC, Statement start timestamp is adjusted at each node
+ * (Coordinator and Datanode) with a difference value that is calculated
+ * based on the global timestamp value received from GTM and the local
+ * clock. This permits to follow the GTM timeline in the cluster.
+ */
+#ifdef PGXC
+ return stmtStartTimestamp + GTMdeltaTimestamp;
+#else
return stmtStartTimestamp;
+#endif
}
/*
@@ -557,11 +616,36 @@ GetCurrentStatementStartTimestamp(void)
TimestampTz
GetCurrentTransactionStopTimestamp(void)
{
+ /*
+ * As for Statement start timestamp, stop timestamp has to
+ * be adjusted with the delta value calculated with the
+ * timestamp received from GTM and the local node clock.
+ */
+#ifdef PGXC
+ TimestampTz timestamp;
+
+ if (xactStopTimestamp != 0)
+ return xactStopTimestamp + GTMdeltaTimestamp;
+
+ timestamp = GetCurrentTimestamp() + GTMdeltaTimestamp;
+
+ return timestamp;
+#else
if (xactStopTimestamp != 0)
return xactStopTimestamp;
+
return GetCurrentTimestamp();
+#endif
}
+#ifdef PGXC
+TimestampTz
+GetCurrentGTMStartTimestamp(void)
+{
+ return GTMxactStartTimestamp;
+}
+#endif
+
/*
* SetCurrentStatementStartTimestamp
*/
@@ -580,6 +664,20 @@ SetCurrentTransactionStopTimestamp(void)
xactStopTimestamp = GetCurrentTimestamp();
}
+#ifdef PGXC
+/*
+ * SetCurrentGTMDeltaTimestamp
+ *
+ * Note: Sets local timestamp delta with the value received from GTM
+ */
+void
+SetCurrentGTMDeltaTimestamp(TimestampTz timestamp)
+{
+ GTMxactStartTimestamp = timestamp;
+ GTMdeltaTimestamp = GTMxactStartTimestamp - xactStartTimestamp;
+}
+#endif
+
/*
* GetCurrentTransactionNestLevel
*
@@ -950,7 +1048,12 @@ RecordTransactionCommit(void)
MyProc->inCommit = true;
SetCurrentTransactionStopTimestamp();
+#ifdef PGXC
+ /* In Postgres-XC, stop timestamp has to follow the timeline of GTM */
+ xlrec.xact_time = xactStopTimestamp + GTMdeltaTimestamp;
+#else
xlrec.xact_time = xactStopTimestamp;
+#endif
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
rdata[0].data = (char *) (&xlrec);
@@ -1275,7 +1378,12 @@ RecordTransactionAbort(bool isSubXact)
else
{
SetCurrentTransactionStopTimestamp();
+#ifdef PGXC
+ /* In Postgres-XC, stop timestamp has to follow the timeline of GTM */
+ xlrec.xact_time = xactStopTimestamp + GTMdeltaTimestamp;
+#else
xlrec.xact_time = xactStopTimestamp;
+#endif
}
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
@@ -1576,7 +1684,12 @@ StartTransaction(void)
*/
xactStartTimestamp = stmtStartTimestamp;
xactStopTimestamp = 0;
+#ifdef PGXC
+ /* For Postgres-XC, transaction start timestamp has to follow the GTM timeline */
+ pgstat_report_xact_timestamp(GTMxactStartTimestamp);
+#else
pgstat_report_xact_timestamp(xactStartTimestamp);
+#endif
/*
* initialize current transaction state fields
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 0f4072d01e..ba56ca1cb3 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -893,6 +893,48 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot)
}
/*
+ * Send the timestamp down to the Datanode
+ */
+int
+data_node_send_timestamp(DataNodeHandle *handle, TimestampTz timestamp)
+{
+ int msglen = 12; /* 4 bytes for msglen and 8 bytes for timestamp (int64) */
+ uint32 n32;
+ int64 i = (int64) timestamp;
+
+ /* msgType + msgLen */
+ if (ensure_out_buffer_capacity(handle->outEnd + 1 + msglen, handle) != 0)
+ {
+ add_error_message(handle, "out of memory");
+ return EOF;
+ }
+ handle->outBuffer[handle->outEnd++] = 't';
+ msglen = htonl(msglen);
+ memcpy(handle->outBuffer + handle->outEnd, &msglen, 4);
+ handle->outEnd += 4;
+
+ /* High order half first */
+#ifdef INT64_IS_BUSTED
+ /* don't try a right shift of 32 on a 32-bit word */
+ n32 = (i < 0) ? -1 : 0;
+#else
+ n32 = (uint32) (i >> 32);
+#endif
+ n32 = htonl(n32);
+ memcpy(handle->outBuffer + handle->outEnd, &n32, 4);
+ handle->outEnd += 4;
+
+ /* Now the low order half */
+ n32 = (uint32) i;
+ n32 = htonl(n32);
+ memcpy(handle->outBuffer + handle->outEnd, &n32, 4);
+ handle->outEnd += 4;
+
+ return 0;
+}
+
+
+/*
* Add another message to the list of errors to be returned back to the client
* at the convenient time
*/
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 43569e0e75..f065289851 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -1074,6 +1074,7 @@ data_node_begin(int conn_count, DataNodeHandle ** connections,
int i;
struct timeval *timeout = NULL;
RemoteQueryState *combiner;
+ TimestampTz timestamp = GetCurrentGTMStartTimestamp();
/* Send BEGIN */
for (i = 0; i < conn_count; i++)
@@ -1081,6 +1082,9 @@ data_node_begin(int conn_count, DataNodeHandle ** connections,
if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid))
return EOF;
+ if (GlobalTimestampIsValid(timestamp) && data_node_send_timestamp(connections[i], timestamp))
+ return EOF;
+
if (data_node_send_query(connections[i], "BEGIN"))
return EOF;
}
@@ -1222,8 +1226,13 @@ data_node_commit(int conn_count, DataNodeHandle ** connections)
else
sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid);
- /* We need to use a new xid, the data nodes have reset */
- two_phase_xid = BeginTranGTM();
+ /*
+ * We need to use a new xid, the data nodes have reset
+ * Timestamp has already been set with BEGIN on remote Datanodes,
+ * so don't use it here.
+ */
+ two_phase_xid = BeginTranGTM(NULL);
+
for (i = 0; i < conn_count; i++)
{
if (data_node_send_gxid(connections[i], two_phase_xid))
@@ -1338,6 +1347,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
bool need_tran;
GlobalTransactionId gxid;
RemoteQueryState *combiner;
+ TimestampTz timestamp = GetCurrentGTMStartTimestamp();
if (conn_count == 0)
return NULL;
@@ -1432,6 +1442,19 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
pfree(copy_connections);
return NULL;
}
+ if (conn_count == 1 && data_node_send_timestamp(connections[i], timestamp))
+ {
+ /*
+ * If a transaction involves multiple connections timestamp, is
+ * always sent down to Datanodes with data_node_begin.
+ * An autocommit transaction needs the global timestamp also,
+ * so handle this case here.
+ */
+ add_error_message(connections[i], "Can not send request");
+ pfree(connections);
+ pfree(copy_connections);
+ return NULL;
+ }
if (snapshot && data_node_send_snapshot(connections[i], snapshot))
{
add_error_message(connections[i], "Can not send request");
@@ -2027,7 +2050,8 @@ ExecRemoteQuery(RemoteQueryState *node)
bool force_autocommit = step->force_autocommit;
bool is_read_only = step->read_only;
GlobalTransactionId gxid = InvalidGlobalTransactionId;
- Snapshot snapshot = GetActiveSnapshot();
+ Snapshot snapshot = GetActiveSnapshot();
+ TimestampTz timestamp = GetCurrentGTMStartTimestamp();
DataNodeHandle **connections = NULL;
DataNodeHandle **primaryconnection = NULL;
int i;
@@ -2133,6 +2157,20 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
+ if (total_conn_count == 1 && data_node_send_timestamp(primaryconnection[0], timestamp))
+ {
+ /*
+ * If a transaction involves multiple connections timestamp is
+ * always sent down to Datanodes with data_node_begin.
+ * An autocommit transaction needs the global timestamp also,
+ * so handle this case here.
+ */
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot))
{
pfree(connections);
@@ -2184,6 +2222,20 @@ ExecRemoteQuery(RemoteQueryState *node)
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
}
+ if (total_conn_count == 1 && data_node_send_timestamp(connections[i], timestamp))
+ {
+ /*
+ * If a transaction involves multiple connections timestamp is
+ * always sent down to Datanodes with data_node_begin.
+ * An autocommit transaction needs the global timestamp also,
+ * so handle this case here.
+ */
+ pfree(connections);
+ pfree(primaryconnection);
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send command to data nodes")));
+ }
if (snapshot && data_node_send_snapshot(connections[i], snapshot))
{
pfree(connections);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index bcaa8f0ada..eaedd05c61 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -429,8 +429,9 @@ SocketBackend(StringInfo inBuf)
errmsg("invalid frontend message type %d", qtype)));
break;
#ifdef PGXC /* PGXC_DATANODE */
- case 'g':
- case 's':
+ case 'g': /* GXID */
+ case 's': /* Snapshot */
+ case 't': /* Timestamp */
break;
#endif
@@ -2951,6 +2952,8 @@ PostgresMain(int argc, char *argv[], const char *username)
int xmax;
int xcnt;
int *xip;
+ /* Timestamp info */
+ TimestampTz timestamp;
#endif
#define PendingConfigOption(name,val) \
@@ -4015,6 +4018,17 @@ PostgresMain(int argc, char *argv[], const char *username)
pq_getmsgend(&input_message);
SetGlobalSnapshotData(xmin, xmax, xcnt, xip);
break;
+
+ case 't': /* timestamp */
+ timestamp = (TimestampTz) pq_getmsgint64(&input_message);
+ pq_getmsgend(&input_message);
+
+ /*
+ * Set in xact.x the static Timestamp difference value with GTM
+ * and the timestampreceivedvalues for Datanode reference
+ */
+ SetCurrentGTMDeltaTimestamp(timestamp);
+ break;
#endif /* PGXC */
default:
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index f2c44e7e85..be3a8e4a1f 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -32,6 +32,10 @@
#include "utils/builtins.h"
#include "utils/datetime.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
+
/*
* gcc's -ffast-math switch breaks routines that expect exact results from
* expressions like timeval / SECS_PER_HOUR, where timeval is double.
diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c
index 051bb1d2e1..0847b0d9cd 100644
--- a/src/gtm/client/fe-protocol.c
+++ b/src/gtm/client/fe-protocol.c
@@ -350,12 +350,22 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
break;
case TXN_BEGIN_GETGXID_RESULT:
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid_tp.gxid,
+ sizeof (GlobalTransactionId), conn))
+ {
+ result->gr_status = -1;
+ break;
+ }
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid_tp.timestamp,
+ sizeof (GTM_Timestamp), conn))
+ result->gr_status = -1;
+ break;
case TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT:
case TXN_PREPARE_RESULT:
if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid,
sizeof (GlobalTransactionId), conn))
result->gr_status = -1;
- break;
+ break;
case TXN_COMMIT_RESULT:
case TXN_ROLLBACK_RESULT:
@@ -393,9 +403,11 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result)
result->gr_status = -1;
break;
}
+ if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txn_get_multi.timestamp,
+ sizeof (GTM_Timestamp), conn))
+ result->gr_status = -1;
break;
-
case TXN_COMMIT_MULTI_RESULT:
case TXN_ROLLBACK_MULTI_RESULT:
if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txn_rc_multi.txn_count,
diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c
index 9df28c7f14..35f81ae943 100644
--- a/src/gtm/client/gtm_client.c
+++ b/src/gtm/client/gtm_client.c
@@ -48,7 +48,7 @@ disconnect_gtm(GTM_Conn *conn)
* Transaction Management API
*/
GlobalTransactionId
-begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel)
+begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp)
{
bool txn_read_only = false;
GTM_Result *res = NULL;
@@ -78,7 +78,12 @@ begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel)
goto receive_failed;
if (res->gr_status == 0)
- return res->gr_resdata.grd_gxid;
+ {
+ if (timestamp)
+ *timestamp = res->gr_resdata.grd_gxid_tp.timestamp;
+
+ return res->gr_resdata.grd_gxid_tp.gxid;
+ }
else
return InvalidGlobalTransactionId;
diff --git a/src/gtm/main/Makefile b/src/gtm/main/Makefile
index 7fcdf82a83..5d8aaea7d8 100644
--- a/src/gtm/main/Makefile
+++ b/src/gtm/main/Makefile
@@ -3,7 +3,7 @@
top_build_dir=../..
include $(top_build_dir)/gtm/Makefile.global
-OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o ../common/libgtm.a ../libpq/libpqcomm.a ../path/libgtmpath.a
+OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o ../common/libgtm.a ../libpq/libpqcomm.a ../path/libgtmpath.a
LDFLAGS=-L$(top_build_dir)/common -L$(top_build_dir)/libpq
LIBS=-lpthread
diff --git a/src/gtm/main/gtm_time.c b/src/gtm/main/gtm_time.c
new file mode 100644
index 0000000000..ea795af8f2
--- /dev/null
+++ b/src/gtm/main/gtm_time.c
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * gtm_time.c
+ * Timestamp handling on GTM
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "gtm/gtm.h"
+#include "gtm/gtm_c.h"
+#include "gtm/gtm_time.h"
+#include <time.h>
+#include <sys/time.h>
+
+GTM_Timestamp
+GTM_TimestampGetCurrent(void)
+{
+ struct timeval tp;
+ GTM_Timestamp result;
+
+ gettimeofday(&tp, NULL);
+
+ result = (GTM_Timestamp) tp.tv_sec -
+ ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+#ifdef HAVE_INT64_TIMESTAMP
+ result = (result * USECS_PER_SEC) + tp.tv_usec;
+#else
+ result = result + (tp.tv_usec / 1000000.0);
+#endif
+
+ return result;
+}
diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c
index 6090ae10fb..dec0a63d09 100644
--- a/src/gtm/main/gtm_txn.c
+++ b/src/gtm/main/gtm_txn.c
@@ -18,6 +18,8 @@
#include "gtm/palloc.h"
#include "gtm/gtm.h"
#include "gtm/gtm_txn.h"
+#include "gtm/gtm_c.h"
+#include "gtm/gtm_time.h"
#include "gtm/assert.h"
#include "gtm/stringinfo.h"
#include "gtm/libpq.h"
@@ -840,6 +842,7 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message)
bool txn_read_only;
StringInfoData buf;
GTM_TransactionHandle txn;
+ GTM_Timestamp timestamp;
MemoryContext oldContext;
txn_isolation_level = pq_getmsgint(message, sizeof (GTM_IsolationLevel));
@@ -860,6 +863,9 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
+ /* GXID has been received, now it's time to get a GTM timestamp */
+ timestamp = GTM_TimestampGetCurrent();
+
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, TXN_BEGIN_RESULT, 4);
if (myport->is_proxy)
@@ -869,6 +875,7 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message)
pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader));
}
pq_sendbytes(&buf, (char *)&txn, sizeof(txn));
+ pq_sendbytes(&buf, (char *)&timestamp, sizeof (GTM_Timestamp));
pq_endmessage(myport, &buf);
if (!myport->is_proxy)
@@ -1003,6 +1010,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
StringInfoData buf;
GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS];
GlobalTransactionId gxid, end_gxid;
+ GTM_Timestamp timestamp;
GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS];
MemoryContext oldContext;
int count;
@@ -1042,6 +1050,9 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
MemoryContextSwitchTo(oldContext);
+ /* GXID has been received, now it's time to get a GTM timestamp */
+ timestamp = GTM_TimestampGetCurrent();
+
end_gxid = gxid + txn_count;
if (end_gxid < gxid)
end_gxid += FirstNormalGlobalTransactionId;
@@ -1058,6 +1069,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message)
}
pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count));
pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid));
+ pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp));
pq_endmessage(myport, &buf);
if (!myport->is_proxy)
diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c
index f5f6e65342..66b15946a6 100644
--- a/src/gtm/proxy/proxy_main.c
+++ b/src/gtm/proxy/proxy_main.c
@@ -988,6 +988,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
{
StringInfoData buf;
GlobalTransactionId gxid;
+ GTM_Timestamp timestamp;
switch (cmdinfo->ci_mtype)
{
@@ -1011,9 +1012,13 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
if (gxid < res->gr_resdata.grd_txn_get_multi.start_gxid)
gxid += FirstNormalGlobalTransactionId;
+ /* Send back to each client the same timestamp value asked in this message */
+ timestamp = res->gr_resdata.grd_txn_get_multi.timestamp;
+
pq_beginmessage(&buf, 'S');
pq_sendint(&buf, TXN_BEGIN_GETGXID_RESULT, 4);
pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId));
+ pq_sendbytes(&buf, (char *)&timestamp, sizeof (GTM_Timestamp));
pq_endmessage(cmdinfo->ci_conn->con_port, &buf);
pq_flush(cmdinfo->ci_conn->con_port);
}
diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h
index 3831f09303..4878d929b7 100644
--- a/src/include/access/gtm.h
+++ b/src/include/access/gtm.h
@@ -20,7 +20,7 @@ extern int GtmCoordinatorId;
extern bool IsGTMConnected(void);
extern void InitGTM(void);
extern void CloseGTM(void);
-extern GlobalTransactionId BeginTranGTM(void);
+extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp);
extern GlobalTransactionId BeginTranAutovacuumGTM(void);
extern int CommitTranGTM(GlobalTransactionId gxid);
extern int RollbackTranGTM(GlobalTransactionId gxid);
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index a7a8230595..b801776b61 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -16,7 +16,9 @@
#define TRANSAM_H
#include "access/xlogdefs.h"
-
+#ifdef PGXC
+#include "gtm/gtm_c.h"
+#endif
/* ----------------
* Special transaction ID values
@@ -157,8 +159,10 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
extern void SetNextTransactionId(TransactionId xid);
extern void SetForceXidFromGTM(bool value);
extern bool GetForceXidFromGTM(void);
-#endif /* PGXC */
+extern TransactionId GetNewTransactionId(bool isSubXact, bool *timestamp_received, GTM_Timestamp *timestamp);
+#else
extern TransactionId GetNewTransactionId(bool isSubXact);
+#endif /* PGXC */
extern TransactionId ReadNewTransactionId(void);
extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
Name oldest_datname);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 871713f4f1..24d420f6e7 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -157,6 +157,10 @@ extern TimestampTz GetCurrentTransactionStartTimestamp(void);
extern TimestampTz GetCurrentStatementStartTimestamp(void);
extern TimestampTz GetCurrentTransactionStopTimestamp(void);
extern void SetCurrentStatementStartTimestamp(void);
+#ifdef PGXC
+extern TimestampTz GetCurrentGTMStartTimestamp(void);
+extern void SetCurrentGTMDeltaTimestamp(TimestampTz timestamp);
+#endif
extern int GetCurrentTransactionNestLevel(void);
extern bool TransactionIdIsCurrentTransactionId(TransactionId xid);
extern void CommandCounterIncrement(void);
diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h
index 1a04064b6d..0a4c941805 100644
--- a/src/include/gtm/gtm_c.h
+++ b/src/include/gtm/gtm_c.h
@@ -55,6 +55,12 @@ typedef int32 GTM_TransactionHandle;
#define InvalidTransactionHandle -1
+/*
+ * As GTM and Postgres-XC packages are separated, GTM and XC's API
+ * use different type names for timestamps and sequences, but they have to be the same!
+ */
+typedef int64 GTM_Timestamp; /* timestamp data is 64-bit based */
+
typedef int64 GTM_Sequence; /* a 64-bit sequence */
typedef struct GTM_SequenceKeyData
{
diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h
index 05e44bfb25..9db688486a 100644
--- a/src/include/gtm/gtm_client.h
+++ b/src/include/gtm/gtm_client.h
@@ -21,8 +21,14 @@
typedef union GTM_ResultData
{
GTM_TransactionHandle grd_txnhandle; /* TXN_BEGIN */
- GlobalTransactionId grd_gxid; /* TXN_BEGIN_GETGXID
- * TXN_PREPARE
+
+ struct
+ {
+ GlobalTransactionId gxid;
+ GTM_Timestamp timestamp;
+ } grd_gxid_tp; /* TXN_BEGIN_GETGXID */
+
+ GlobalTransactionId grd_gxid; /* TXN_PREPARE
* TXN_COMMIT
* TXN_ROLLBACK
*/
@@ -47,6 +53,7 @@ typedef union GTM_ResultData
{
int txn_count; /* TXN_BEGIN_GETGXID_MULTI */
GlobalTransactionId start_gxid;
+ GTM_Timestamp timestamp;
} grd_txn_get_multi;
struct
@@ -101,7 +108,7 @@ void disconnect_gtm(GTM_Conn *conn);
/*
* Transaction Management API
*/
-GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel);
+GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp);
GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLevel isolevel);
int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid);
diff --git a/src/include/gtm/gtm_time.h b/src/include/gtm/gtm_time.h
new file mode 100644
index 0000000000..b3d7005db9
--- /dev/null
+++ b/src/include/gtm/gtm_time.h
@@ -0,0 +1,37 @@
+/*-------------------------------------------------------------------------
+ *
+ * gtm_time.h
+ *
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef GTM_TIME_H
+#define GTM_TIME_H
+
+/* Julian-date equivalents of Day 0 in Unix and GTM reckoning */
+#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
+#define GTM_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
+
+#define SECS_PER_YEAR (36525 * 864) /* avoid floating-point computation */
+#define SECS_PER_DAY 86400
+#define SECS_PER_HOUR 3600
+#define SECS_PER_MINUTE 60
+#define MINS_PER_HOUR 60
+
+#ifdef HAVE_INT64_TIMESTAMP
+#define USECS_PER_DAY INT64CONST(86400000000)
+#define USECS_PER_HOUR INT64CONST(3600000000)
+#define USECS_PER_MINUTE INT64CONST(60000000)
+#define USECS_PER_SEC INT64CONST(1000000)
+#endif
+
+GTM_Timestamp GTM_TimestampGetCurrent(void);
+
+#endif
diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h
index 849d84acea..4202e2e942 100644
--- a/src/include/pgxc/datanode.h
+++ b/src/include/pgxc/datanode.h
@@ -18,6 +18,7 @@
#define DATANODE_H
#include "postgres.h"
#include "gtm/gtm_c.h"
+#include "utils/timestamp.h"
#include "nodes/pg_list.h"
#include "utils/snapshot.h"
#include <unistd.h>
@@ -88,6 +89,7 @@ extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * hand
extern int data_node_send_query(DataNodeHandle * handle, const char *query);
extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid);
extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot);
+extern int data_node_send_timestamp(DataNodeHandle * handle, TimestampTz timestamp);
extern int data_node_receive(const int conn_count,
DataNodeHandle ** connections, struct timeval * timeout);
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 2765d3927f..7c8e744915 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -23,6 +23,10 @@
#include "utils/int8.h"
#endif
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
+
/*
* Timestamp represents absolute time.
*
@@ -43,6 +47,11 @@
#ifdef HAVE_INT64_TIMESTAMP
+/*
+ * PGXC note: GTM and Postgres-XC packages have to be separated.
+ * Both use use different type names for timestamp, but those types have to be the same!
+ */
+
typedef int64 Timestamp;
typedef int64 TimestampTz;
typedef int64 TimeOffset;
@@ -188,6 +197,10 @@ typedef struct
#define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) / 1000.0))
#endif
+#ifdef PGXC
+#define InvalidGlobalTimestamp ((TimestampTz) 0)
+#define GlobalTimestampIsValid(timestamp) ((TimestampTz) (timestamp)) != InvalidGlobalTimestamp
+#endif
/* Set at postmaster start */
extern TimestampTz PgStartTime;