diff options
| -rw-r--r-- | src/backend/access/transam/gtm.c | 6 | ||||
| -rw-r--r-- | src/backend/access/transam/varsup.c | 22 | ||||
| -rw-r--r-- | src/backend/access/transam/xact.c | 117 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/datanode.c | 42 | ||||
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 58 | ||||
| -rw-r--r-- | src/backend/tcop/postgres.c | 18 | ||||
| -rw-r--r-- | src/backend/utils/adt/timestamp.c | 4 | ||||
| -rw-r--r-- | src/gtm/client/fe-protocol.c | 16 | ||||
| -rw-r--r-- | src/gtm/client/gtm_client.c | 9 | ||||
| -rw-r--r-- | src/gtm/main/Makefile | 2 | ||||
| -rw-r--r-- | src/gtm/main/gtm_time.c | 41 | ||||
| -rw-r--r-- | src/gtm/main/gtm_txn.c | 12 | ||||
| -rw-r--r-- | src/gtm/proxy/proxy_main.c | 5 | ||||
| -rw-r--r-- | src/include/access/gtm.h | 2 | ||||
| -rw-r--r-- | src/include/access/transam.h | 8 | ||||
| -rw-r--r-- | src/include/access/xact.h | 4 | ||||
| -rw-r--r-- | src/include/gtm/gtm_c.h | 6 | ||||
| -rw-r--r-- | src/include/gtm/gtm_client.h | 13 | ||||
| -rw-r--r-- | src/include/gtm/gtm_time.h | 37 | ||||
| -rw-r--r-- | src/include/pgxc/datanode.h | 2 | ||||
| -rw-r--r-- | src/include/utils/timestamp.h | 13 |
21 files changed, 410 insertions, 27 deletions
diff --git a/src/backend/access/transam/gtm.c b/src/backend/access/transam/gtm.c index f9499c9e86..c7f35478d9 100644 --- a/src/backend/access/transam/gtm.c +++ b/src/backend/access/transam/gtm.c @@ -67,14 +67,14 @@ CloseGTM(void) } GlobalTransactionId -BeginTranGTM(void) +BeginTranGTM(GTM_Timestamp *timestamp) { GlobalTransactionId xid = InvalidGlobalTransactionId; CheckConnection(); // TODO Isolation level if (conn) - xid = begin_transaction(conn, GTM_ISOLATION_RC); + xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp); /* If something went wrong (timeout), try and reset GTM connection * and retry. This is safe at the beginning of a transaction. @@ -84,7 +84,7 @@ BeginTranGTM(void) CloseGTM(); InitGTM(); if (conn) - xid = begin_transaction(conn, GTM_ISOLATION_RC); + xid = begin_transaction(conn, GTM_ISOLATION_RC, timestamp); } return xid; } diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index 4de1080544..8145fac713 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -75,11 +75,17 @@ GetForceXidFromGTM(void) * The new XID is also stored into MyProc before returning. */ TransactionId +#ifdef PGXC +GetNewTransactionId(bool isSubXact, bool *timestamp_received, GTM_Timestamp *timestamp) +#else GetNewTransactionId(bool isSubXact) +#endif { TransactionId xid; -#ifdef PGXC +#ifdef PGXC bool increment_xid = true; + + *timestamp_received = false; #endif /* @@ -102,8 +108,10 @@ GetNewTransactionId(bool isSubXact) * This will help with GTM connection issues- we will not * block all other processes. */ - xid = (TransactionId) BeginTranGTM(); + xid = (TransactionId) BeginTranGTM(timestamp); + *timestamp_received = true; } + #endif LWLockAcquire(XidGenLock, LW_EXCLUSIVE); @@ -144,18 +152,20 @@ GetNewTransactionId(bool isSubXact) * exclude it from other snapshots. */ next_xid = (TransactionId) BeginTranAutovacuumGTM(); - } else { + } + else + { elog (DEBUG1, "Getting XID for autovacuum worker (analyze)"); /* try and get gxid directly from GTM */ - next_xid = (TransactionId) BeginTranGTM(); + next_xid = (TransactionId) BeginTranGTM(NULL); } } else if (GetForceXidFromGTM()) { elog (DEBUG1, "Force get XID from GTM"); /* try and get gxid directly from GTM */ - next_xid = (TransactionId) BeginTranGTM(); + next_xid = (TransactionId) BeginTranGTM(NULL); } - + if (TransactionIdIsValid(next_xid)) { xid = next_xid; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 4db1e2c864..6c5ee4a400 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -208,6 +208,19 @@ static TimestampTz stmtStartTimestamp; static TimestampTz xactStopTimestamp; /* + * PGXC receives from GTM a timestamp value at the same time as a GXID + * This one is set as GTMxactStartTimestamp and is a return value of now(), current_transaction(). + * GTMxactStartTimestamp is also sent to each node with gxid and snapshot and delta is calculated locally. + * GTMdeltaTimestamp is used to calculate current_statement as its value can change + * during a transaction. Delta can have a different value through the nodes of the cluster + * but its uniqueness in the cluster is maintained thanks to the global value GTMxactStartTimestamp. + */ +#ifdef PGXC +static TimestampTz GTMxactStartTimestamp = 0; +static TimestampTz GTMdeltaTimestamp = 0; +#endif + +/* * GID to be used for preparing the current transaction. This is also * global to a whole transaction, so we don't keep it in the state stack. */ @@ -315,12 +328,28 @@ GetCurrentGlobalTransactionId(void) * * This will return the GXID of the specified transaction, * getting one from the GTM if it's not yet set. + * It also returns a timestamp value if a GXID has been taken from GTM */ static GlobalTransactionId GetGlobalTransactionId(TransactionState s) { + GTM_Timestamp gtm_timestamp; + bool received_tp; + + /* + * Here we receive timestamp at the same time as gxid. + */ if (!GlobalTransactionIdIsValid(s->globalTransactionId)) - s->globalTransactionId = (GlobalTransactionId) GetNewTransactionId(s->parent != NULL); + s->globalTransactionId = (GlobalTransactionId) GetNewTransactionId(s->parent != NULL, + &received_tp, + >m_timestamp); + + /* Set a timestamp value if and only if it has been received from GTM */ + if (received_tp) + { + GTMxactStartTimestamp = (TimestampTz) gtm_timestamp; + GTMdeltaTimestamp = GTMxactStartTimestamp - stmtStartTimestamp; + } return s->globalTransactionId; } @@ -473,8 +502,20 @@ AssignTransactionId(TransactionState s) s->transactionId, isSubXact ? "true" : "false"); } else -#endif + { + GTM_Timestamp gtm_timestamp; + bool received_tp; + + s->transactionId = GetNewTransactionId(isSubXact, &received_tp, >m_timestamp); + if (received_tp) + { + GTMxactStartTimestamp = (TimestampTz) gtm_timestamp; + GTMdeltaTimestamp = GTMxactStartTimestamp - stmtStartTimestamp; + } + } +#else s->transactionId = GetNewTransactionId(isSubXact); +#endif if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId); @@ -536,7 +577,15 @@ GetCurrentCommandId(bool used) TimestampTz GetCurrentTransactionStartTimestamp(void) { + /* + * In Postgres-XC, Transaction start timestamp is the value received + * from GTM along with GXID. + */ +#ifdef PGXC + return GTMxactStartTimestamp; +#else return xactStartTimestamp; +#endif } /* @@ -545,7 +594,17 @@ GetCurrentTransactionStartTimestamp(void) TimestampTz GetCurrentStatementStartTimestamp(void) { + /* + * For Postgres-XC, Statement start timestamp is adjusted at each node + * (Coordinator and Datanode) with a difference value that is calculated + * based on the global timestamp value received from GTM and the local + * clock. This permits to follow the GTM timeline in the cluster. + */ +#ifdef PGXC + return stmtStartTimestamp + GTMdeltaTimestamp; +#else return stmtStartTimestamp; +#endif } /* @@ -557,11 +616,36 @@ GetCurrentStatementStartTimestamp(void) TimestampTz GetCurrentTransactionStopTimestamp(void) { + /* + * As for Statement start timestamp, stop timestamp has to + * be adjusted with the delta value calculated with the + * timestamp received from GTM and the local node clock. + */ +#ifdef PGXC + TimestampTz timestamp; + + if (xactStopTimestamp != 0) + return xactStopTimestamp + GTMdeltaTimestamp; + + timestamp = GetCurrentTimestamp() + GTMdeltaTimestamp; + + return timestamp; +#else if (xactStopTimestamp != 0) return xactStopTimestamp; + return GetCurrentTimestamp(); +#endif } +#ifdef PGXC +TimestampTz +GetCurrentGTMStartTimestamp(void) +{ + return GTMxactStartTimestamp; +} +#endif + /* * SetCurrentStatementStartTimestamp */ @@ -580,6 +664,20 @@ SetCurrentTransactionStopTimestamp(void) xactStopTimestamp = GetCurrentTimestamp(); } +#ifdef PGXC +/* + * SetCurrentGTMDeltaTimestamp + * + * Note: Sets local timestamp delta with the value received from GTM + */ +void +SetCurrentGTMDeltaTimestamp(TimestampTz timestamp) +{ + GTMxactStartTimestamp = timestamp; + GTMdeltaTimestamp = GTMxactStartTimestamp - xactStartTimestamp; +} +#endif + /* * GetCurrentTransactionNestLevel * @@ -950,7 +1048,12 @@ RecordTransactionCommit(void) MyProc->inCommit = true; SetCurrentTransactionStopTimestamp(); +#ifdef PGXC + /* In Postgres-XC, stop timestamp has to follow the timeline of GTM */ + xlrec.xact_time = xactStopTimestamp + GTMdeltaTimestamp; +#else xlrec.xact_time = xactStopTimestamp; +#endif xlrec.nrels = nrels; xlrec.nsubxacts = nchildren; rdata[0].data = (char *) (&xlrec); @@ -1275,7 +1378,12 @@ RecordTransactionAbort(bool isSubXact) else { SetCurrentTransactionStopTimestamp(); +#ifdef PGXC + /* In Postgres-XC, stop timestamp has to follow the timeline of GTM */ + xlrec.xact_time = xactStopTimestamp + GTMdeltaTimestamp; +#else xlrec.xact_time = xactStopTimestamp; +#endif } xlrec.nrels = nrels; xlrec.nsubxacts = nchildren; @@ -1576,7 +1684,12 @@ StartTransaction(void) */ xactStartTimestamp = stmtStartTimestamp; xactStopTimestamp = 0; +#ifdef PGXC + /* For Postgres-XC, transaction start timestamp has to follow the GTM timeline */ + pgstat_report_xact_timestamp(GTMxactStartTimestamp); +#else pgstat_report_xact_timestamp(xactStartTimestamp); +#endif /* * initialize current transaction state fields diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 0f4072d01e..ba56ca1cb3 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -893,6 +893,48 @@ data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot) } /* + * Send the timestamp down to the Datanode + */ +int +data_node_send_timestamp(DataNodeHandle *handle, TimestampTz timestamp) +{ + int msglen = 12; /* 4 bytes for msglen and 8 bytes for timestamp (int64) */ + uint32 n32; + int64 i = (int64) timestamp; + + /* msgType + msgLen */ + if (ensure_out_buffer_capacity(handle->outEnd + 1 + msglen, handle) != 0) + { + add_error_message(handle, "out of memory"); + return EOF; + } + handle->outBuffer[handle->outEnd++] = 't'; + msglen = htonl(msglen); + memcpy(handle->outBuffer + handle->outEnd, &msglen, 4); + handle->outEnd += 4; + + /* High order half first */ +#ifdef INT64_IS_BUSTED + /* don't try a right shift of 32 on a 32-bit word */ + n32 = (i < 0) ? -1 : 0; +#else + n32 = (uint32) (i >> 32); +#endif + n32 = htonl(n32); + memcpy(handle->outBuffer + handle->outEnd, &n32, 4); + handle->outEnd += 4; + + /* Now the low order half */ + n32 = (uint32) i; + n32 = htonl(n32); + memcpy(handle->outBuffer + handle->outEnd, &n32, 4); + handle->outEnd += 4; + + return 0; +} + + +/* * Add another message to the list of errors to be returned back to the client * at the convenient time */ diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 43569e0e75..f065289851 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1074,6 +1074,7 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, int i; struct timeval *timeout = NULL; RemoteQueryState *combiner; + TimestampTz timestamp = GetCurrentGTMStartTimestamp(); /* Send BEGIN */ for (i = 0; i < conn_count; i++) @@ -1081,6 +1082,9 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, if (GlobalTransactionIdIsValid(gxid) && data_node_send_gxid(connections[i], gxid)) return EOF; + if (GlobalTimestampIsValid(timestamp) && data_node_send_timestamp(connections[i], timestamp)) + return EOF; + if (data_node_send_query(connections[i], "BEGIN")) return EOF; } @@ -1222,8 +1226,13 @@ data_node_commit(int conn_count, DataNodeHandle ** connections) else sprintf(buffer, "COMMIT PREPARED 'T%d'", gxid); - /* We need to use a new xid, the data nodes have reset */ - two_phase_xid = BeginTranGTM(); + /* + * We need to use a new xid, the data nodes have reset + * Timestamp has already been set with BEGIN on remote Datanodes, + * so don't use it here. + */ + two_phase_xid = BeginTranGTM(NULL); + for (i = 0; i < conn_count; i++) { if (data_node_send_gxid(connections[i], two_phase_xid)) @@ -1338,6 +1347,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ bool need_tran; GlobalTransactionId gxid; RemoteQueryState *combiner; + TimestampTz timestamp = GetCurrentGTMStartTimestamp(); if (conn_count == 0) return NULL; @@ -1432,6 +1442,19 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_ pfree(copy_connections); return NULL; } + if (conn_count == 1 && data_node_send_timestamp(connections[i], timestamp)) + { + /* + * If a transaction involves multiple connections timestamp, is + * always sent down to Datanodes with data_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + add_error_message(connections[i], "Can not send request"); + pfree(connections); + pfree(copy_connections); + return NULL; + } if (snapshot && data_node_send_snapshot(connections[i], snapshot)) { add_error_message(connections[i], "Can not send request"); @@ -2027,7 +2050,8 @@ ExecRemoteQuery(RemoteQueryState *node) bool force_autocommit = step->force_autocommit; bool is_read_only = step->read_only; GlobalTransactionId gxid = InvalidGlobalTransactionId; - Snapshot snapshot = GetActiveSnapshot(); + Snapshot snapshot = GetActiveSnapshot(); + TimestampTz timestamp = GetCurrentGTMStartTimestamp(); DataNodeHandle **connections = NULL; DataNodeHandle **primaryconnection = NULL; int i; @@ -2133,6 +2157,20 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } + if (total_conn_count == 1 && data_node_send_timestamp(primaryconnection[0], timestamp)) + { + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with data_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } if (snapshot && data_node_send_snapshot(primaryconnection[0], snapshot)) { pfree(connections); @@ -2184,6 +2222,20 @@ ExecRemoteQuery(RemoteQueryState *node) (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to send command to data nodes"))); } + if (total_conn_count == 1 && data_node_send_timestamp(connections[i], timestamp)) + { + /* + * If a transaction involves multiple connections timestamp is + * always sent down to Datanodes with data_node_begin. + * An autocommit transaction needs the global timestamp also, + * so handle this case here. + */ + pfree(connections); + pfree(primaryconnection); + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send command to data nodes"))); + } if (snapshot && data_node_send_snapshot(connections[i], snapshot)) { pfree(connections); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index bcaa8f0ada..eaedd05c61 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -429,8 +429,9 @@ SocketBackend(StringInfo inBuf) errmsg("invalid frontend message type %d", qtype))); break; #ifdef PGXC /* PGXC_DATANODE */ - case 'g': - case 's': + case 'g': /* GXID */ + case 's': /* Snapshot */ + case 't': /* Timestamp */ break; #endif @@ -2951,6 +2952,8 @@ PostgresMain(int argc, char *argv[], const char *username) int xmax; int xcnt; int *xip; + /* Timestamp info */ + TimestampTz timestamp; #endif #define PendingConfigOption(name,val) \ @@ -4015,6 +4018,17 @@ PostgresMain(int argc, char *argv[], const char *username) pq_getmsgend(&input_message); SetGlobalSnapshotData(xmin, xmax, xcnt, xip); break; + + case 't': /* timestamp */ + timestamp = (TimestampTz) pq_getmsgint64(&input_message); + pq_getmsgend(&input_message); + + /* + * Set in xact.x the static Timestamp difference value with GTM + * and the timestampreceivedvalues for Datanode reference + */ + SetCurrentGTMDeltaTimestamp(timestamp); + break; #endif /* PGXC */ default: diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index f2c44e7e85..be3a8e4a1f 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -32,6 +32,10 @@ #include "utils/builtins.h" #include "utils/datetime.h" +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif + /* * gcc's -ffast-math switch breaks routines that expect exact results from * expressions like timeval / SECS_PER_HOUR, where timeval is double. diff --git a/src/gtm/client/fe-protocol.c b/src/gtm/client/fe-protocol.c index 051bb1d2e1..0847b0d9cd 100644 --- a/src/gtm/client/fe-protocol.c +++ b/src/gtm/client/fe-protocol.c @@ -350,12 +350,22 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) break; case TXN_BEGIN_GETGXID_RESULT: + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid_tp.gxid, + sizeof (GlobalTransactionId), conn)) + { + result->gr_status = -1; + break; + } + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid_tp.timestamp, + sizeof (GTM_Timestamp), conn)) + result->gr_status = -1; + break; case TXN_BEGIN_GETGXID_AUTOVACUUM_RESULT: case TXN_PREPARE_RESULT: if (gtmpqGetnchar((char *)&result->gr_resdata.grd_gxid, sizeof (GlobalTransactionId), conn)) result->gr_status = -1; - break; + break; case TXN_COMMIT_RESULT: case TXN_ROLLBACK_RESULT: @@ -393,9 +403,11 @@ gtmpqParseSuccess(GTM_Conn *conn, GTM_Result *result) result->gr_status = -1; break; } + if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txn_get_multi.timestamp, + sizeof (GTM_Timestamp), conn)) + result->gr_status = -1; break; - case TXN_COMMIT_MULTI_RESULT: case TXN_ROLLBACK_MULTI_RESULT: if (gtmpqGetnchar((char *)&result->gr_resdata.grd_txn_rc_multi.txn_count, diff --git a/src/gtm/client/gtm_client.c b/src/gtm/client/gtm_client.c index 9df28c7f14..35f81ae943 100644 --- a/src/gtm/client/gtm_client.c +++ b/src/gtm/client/gtm_client.c @@ -48,7 +48,7 @@ disconnect_gtm(GTM_Conn *conn) * Transaction Management API */ GlobalTransactionId -begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel) +begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp) { bool txn_read_only = false; GTM_Result *res = NULL; @@ -78,7 +78,12 @@ begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel) goto receive_failed; if (res->gr_status == 0) - return res->gr_resdata.grd_gxid; + { + if (timestamp) + *timestamp = res->gr_resdata.grd_gxid_tp.timestamp; + + return res->gr_resdata.grd_gxid_tp.gxid; + } else return InvalidGlobalTransactionId; diff --git a/src/gtm/main/Makefile b/src/gtm/main/Makefile index 7fcdf82a83..5d8aaea7d8 100644 --- a/src/gtm/main/Makefile +++ b/src/gtm/main/Makefile @@ -3,7 +3,7 @@ top_build_dir=../.. include $(top_build_dir)/gtm/Makefile.global -OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o ../common/libgtm.a ../libpq/libpqcomm.a ../path/libgtmpath.a +OBJS=main.o gtm_thread.o gtm_txn.o gtm_seq.o gtm_snap.o gtm_time.o ../common/libgtm.a ../libpq/libpqcomm.a ../path/libgtmpath.a LDFLAGS=-L$(top_build_dir)/common -L$(top_build_dir)/libpq LIBS=-lpthread diff --git a/src/gtm/main/gtm_time.c b/src/gtm/main/gtm_time.c new file mode 100644 index 0000000000..ea795af8f2 --- /dev/null +++ b/src/gtm/main/gtm_time.c @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * gtm_time.c + * Timestamp handling on GTM + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + +#include "gtm/gtm.h" +#include "gtm/gtm_c.h" +#include "gtm/gtm_time.h" +#include <time.h> +#include <sys/time.h> + +GTM_Timestamp +GTM_TimestampGetCurrent(void) +{ + struct timeval tp; + GTM_Timestamp result; + + gettimeofday(&tp, NULL); + + result = (GTM_Timestamp) tp.tv_sec - + ((GTM_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + +#ifdef HAVE_INT64_TIMESTAMP + result = (result * USECS_PER_SEC) + tp.tv_usec; +#else + result = result + (tp.tv_usec / 1000000.0); +#endif + + return result; +} diff --git a/src/gtm/main/gtm_txn.c b/src/gtm/main/gtm_txn.c index 6090ae10fb..dec0a63d09 100644 --- a/src/gtm/main/gtm_txn.c +++ b/src/gtm/main/gtm_txn.c @@ -18,6 +18,8 @@ #include "gtm/palloc.h" #include "gtm/gtm.h" #include "gtm/gtm_txn.h" +#include "gtm/gtm_c.h" +#include "gtm/gtm_time.h" #include "gtm/assert.h" #include "gtm/stringinfo.h" #include "gtm/libpq.h" @@ -840,6 +842,7 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) bool txn_read_only; StringInfoData buf; GTM_TransactionHandle txn; + GTM_Timestamp timestamp; MemoryContext oldContext; txn_isolation_level = pq_getmsgint(message, sizeof (GTM_IsolationLevel)); @@ -860,6 +863,9 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); + /* GXID has been received, now it's time to get a GTM timestamp */ + timestamp = GTM_TimestampGetCurrent(); + pq_beginmessage(&buf, 'S'); pq_sendint(&buf, TXN_BEGIN_RESULT, 4); if (myport->is_proxy) @@ -869,6 +875,7 @@ ProcessBeginTransactionCommand(Port *myport, StringInfo message) pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&txn, sizeof(txn)); + pq_sendbytes(&buf, (char *)×tamp, sizeof (GTM_Timestamp)); pq_endmessage(myport, &buf); if (!myport->is_proxy) @@ -1003,6 +1010,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; GlobalTransactionId gxid, end_gxid; + GTM_Timestamp timestamp; GTMProxy_ConnID txn_connid[GTM_MAX_GLOBAL_TRANSACTIONS]; MemoryContext oldContext; int count; @@ -1042,6 +1050,9 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) MemoryContextSwitchTo(oldContext); + /* GXID has been received, now it's time to get a GTM timestamp */ + timestamp = GTM_TimestampGetCurrent(); + end_gxid = gxid + txn_count; if (end_gxid < gxid) end_gxid += FirstNormalGlobalTransactionId; @@ -1058,6 +1069,7 @@ ProcessBeginTransactionGetGXIDCommandMulti(Port *myport, StringInfo message) } pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); pq_sendbytes(&buf, (char *)&gxid, sizeof(gxid)); + pq_sendbytes(&buf, (char *)&(timestamp), sizeof (GTM_Timestamp)); pq_endmessage(myport, &buf); if (!myport->is_proxy) diff --git a/src/gtm/proxy/proxy_main.c b/src/gtm/proxy/proxy_main.c index f5f6e65342..66b15946a6 100644 --- a/src/gtm/proxy/proxy_main.c +++ b/src/gtm/proxy/proxy_main.c @@ -988,6 +988,7 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo, { StringInfoData buf; GlobalTransactionId gxid; + GTM_Timestamp timestamp; switch (cmdinfo->ci_mtype) { @@ -1011,9 +1012,13 @@ ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo, if (gxid < res->gr_resdata.grd_txn_get_multi.start_gxid) gxid += FirstNormalGlobalTransactionId; + /* Send back to each client the same timestamp value asked in this message */ + timestamp = res->gr_resdata.grd_txn_get_multi.timestamp; + pq_beginmessage(&buf, 'S'); pq_sendint(&buf, TXN_BEGIN_GETGXID_RESULT, 4); pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId)); + pq_sendbytes(&buf, (char *)×tamp, sizeof (GTM_Timestamp)); pq_endmessage(cmdinfo->ci_conn->con_port, &buf); pq_flush(cmdinfo->ci_conn->con_port); } diff --git a/src/include/access/gtm.h b/src/include/access/gtm.h index 3831f09303..4878d929b7 100644 --- a/src/include/access/gtm.h +++ b/src/include/access/gtm.h @@ -20,7 +20,7 @@ extern int GtmCoordinatorId; extern bool IsGTMConnected(void); extern void InitGTM(void); extern void CloseGTM(void); -extern GlobalTransactionId BeginTranGTM(void); +extern GlobalTransactionId BeginTranGTM(GTM_Timestamp *timestamp); extern GlobalTransactionId BeginTranAutovacuumGTM(void); extern int CommitTranGTM(GlobalTransactionId gxid); extern int RollbackTranGTM(GlobalTransactionId gxid); diff --git a/src/include/access/transam.h b/src/include/access/transam.h index a7a8230595..b801776b61 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -16,7 +16,9 @@ #define TRANSAM_H #include "access/xlogdefs.h" - +#ifdef PGXC +#include "gtm/gtm_c.h" +#endif /* ---------------- * Special transaction ID values @@ -157,8 +159,10 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid); extern void SetNextTransactionId(TransactionId xid); extern void SetForceXidFromGTM(bool value); extern bool GetForceXidFromGTM(void); -#endif /* PGXC */ +extern TransactionId GetNewTransactionId(bool isSubXact, bool *timestamp_received, GTM_Timestamp *timestamp); +#else extern TransactionId GetNewTransactionId(bool isSubXact); +#endif /* PGXC */ extern TransactionId ReadNewTransactionId(void); extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Name oldest_datname); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 871713f4f1..24d420f6e7 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -157,6 +157,10 @@ extern TimestampTz GetCurrentTransactionStartTimestamp(void); extern TimestampTz GetCurrentStatementStartTimestamp(void); extern TimestampTz GetCurrentTransactionStopTimestamp(void); extern void SetCurrentStatementStartTimestamp(void); +#ifdef PGXC +extern TimestampTz GetCurrentGTMStartTimestamp(void); +extern void SetCurrentGTMDeltaTimestamp(TimestampTz timestamp); +#endif extern int GetCurrentTransactionNestLevel(void); extern bool TransactionIdIsCurrentTransactionId(TransactionId xid); extern void CommandCounterIncrement(void); diff --git a/src/include/gtm/gtm_c.h b/src/include/gtm/gtm_c.h index 1a04064b6d..0a4c941805 100644 --- a/src/include/gtm/gtm_c.h +++ b/src/include/gtm/gtm_c.h @@ -55,6 +55,12 @@ typedef int32 GTM_TransactionHandle; #define InvalidTransactionHandle -1 +/* + * As GTM and Postgres-XC packages are separated, GTM and XC's API + * use different type names for timestamps and sequences, but they have to be the same! + */ +typedef int64 GTM_Timestamp; /* timestamp data is 64-bit based */ + typedef int64 GTM_Sequence; /* a 64-bit sequence */ typedef struct GTM_SequenceKeyData { diff --git a/src/include/gtm/gtm_client.h b/src/include/gtm/gtm_client.h index 05e44bfb25..9db688486a 100644 --- a/src/include/gtm/gtm_client.h +++ b/src/include/gtm/gtm_client.h @@ -21,8 +21,14 @@ typedef union GTM_ResultData { GTM_TransactionHandle grd_txnhandle; /* TXN_BEGIN */ - GlobalTransactionId grd_gxid; /* TXN_BEGIN_GETGXID - * TXN_PREPARE + + struct + { + GlobalTransactionId gxid; + GTM_Timestamp timestamp; + } grd_gxid_tp; /* TXN_BEGIN_GETGXID */ + + GlobalTransactionId grd_gxid; /* TXN_PREPARE * TXN_COMMIT * TXN_ROLLBACK */ @@ -47,6 +53,7 @@ typedef union GTM_ResultData { int txn_count; /* TXN_BEGIN_GETGXID_MULTI */ GlobalTransactionId start_gxid; + GTM_Timestamp timestamp; } grd_txn_get_multi; struct @@ -101,7 +108,7 @@ void disconnect_gtm(GTM_Conn *conn); /* * Transaction Management API */ -GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel); +GlobalTransactionId begin_transaction(GTM_Conn *conn, GTM_IsolationLevel isolevel, GTM_Timestamp *timestamp); GlobalTransactionId begin_transaction_autovacuum(GTM_Conn *conn, GTM_IsolationLevel isolevel); int commit_transaction(GTM_Conn *conn, GlobalTransactionId gxid); int abort_transaction(GTM_Conn *conn, GlobalTransactionId gxid); diff --git a/src/include/gtm/gtm_time.h b/src/include/gtm/gtm_time.h new file mode 100644 index 0000000000..b3d7005db9 --- /dev/null +++ b/src/include/gtm/gtm_time.h @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------- + * + * gtm_time.h + * + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ + +#ifndef GTM_TIME_H +#define GTM_TIME_H + +/* Julian-date equivalents of Day 0 in Unix and GTM reckoning */ +#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */ +#define GTM_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */ + +#define SECS_PER_YEAR (36525 * 864) /* avoid floating-point computation */ +#define SECS_PER_DAY 86400 +#define SECS_PER_HOUR 3600 +#define SECS_PER_MINUTE 60 +#define MINS_PER_HOUR 60 + +#ifdef HAVE_INT64_TIMESTAMP +#define USECS_PER_DAY INT64CONST(86400000000) +#define USECS_PER_HOUR INT64CONST(3600000000) +#define USECS_PER_MINUTE INT64CONST(60000000) +#define USECS_PER_SEC INT64CONST(1000000) +#endif + +GTM_Timestamp GTM_TimestampGetCurrent(void); + +#endif diff --git a/src/include/pgxc/datanode.h b/src/include/pgxc/datanode.h index 849d84acea..4202e2e942 100644 --- a/src/include/pgxc/datanode.h +++ b/src/include/pgxc/datanode.h @@ -18,6 +18,7 @@ #define DATANODE_H #include "postgres.h" #include "gtm/gtm_c.h" +#include "utils/timestamp.h" #include "nodes/pg_list.h" #include "utils/snapshot.h" #include <unistd.h> @@ -88,6 +89,7 @@ extern int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle * hand extern int data_node_send_query(DataNodeHandle * handle, const char *query); extern int data_node_send_gxid(DataNodeHandle * handle, GlobalTransactionId gxid); extern int data_node_send_snapshot(DataNodeHandle * handle, Snapshot snapshot); +extern int data_node_send_timestamp(DataNodeHandle * handle, TimestampTz timestamp); extern int data_node_receive(const int conn_count, DataNodeHandle ** connections, struct timeval * timeout); diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h index 2765d3927f..7c8e744915 100644 --- a/src/include/utils/timestamp.h +++ b/src/include/utils/timestamp.h @@ -23,6 +23,10 @@ #include "utils/int8.h" #endif +#ifdef PGXC +#include "pgxc/pgxc.h" +#endif + /* * Timestamp represents absolute time. * @@ -43,6 +47,11 @@ #ifdef HAVE_INT64_TIMESTAMP +/* + * PGXC note: GTM and Postgres-XC packages have to be separated. + * Both use use different type names for timestamp, but those types have to be the same! + */ + typedef int64 Timestamp; typedef int64 TimestampTz; typedef int64 TimeOffset; @@ -188,6 +197,10 @@ typedef struct #define TimestampTzPlusMilliseconds(tz,ms) ((tz) + ((ms) / 1000.0)) #endif +#ifdef PGXC +#define InvalidGlobalTimestamp ((TimestampTz) 0) +#define GlobalTimestampIsValid(timestamp) ((TimestampTz) (timestamp)) != InvalidGlobalTimestamp +#endif /* Set at postmaster start */ extern TimestampTz PgStartTime; |
