summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndres Freund2013-02-22 16:43:27 +0000
committerAndres Freund2013-12-11 15:34:17 +0000
commitd04cd9cca5e6e1425bf69dfe9417e684021f8960 (patch)
tree29ddc0f0fa2487983d321f1a61d2caa0949d8bd1
parentbcc111df21e5a68d4f41381720e41d35ddff5994 (diff)
Introduce "replication_identifiers" to keep track of remote nodes.replication-identifiers
Replication identifiers can be used to track & lookup remote nodes identified via (sysid, tlid, remote_dbid, local_dbid, name) and map that tuple to a local uint16. Keyed by that replication identifier the progress of replication from that system is tracked in a crashsafe manner. Support for tracking that via output plugins is added as well. Needs a catversion bump.
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c17
-rw-r--r--src/backend/access/transam/xact.c68
-rwxr-xr-xsrc/backend/access/transam/xlog.c8
-rw-r--r--src/backend/catalog/Makefile2
-rw-r--r--src/backend/catalog/catalog.c8
-rw-r--r--src/backend/catalog/system_views.sql7
-rw-r--r--src/backend/replication/logical/Makefile3
-rw-r--r--src/backend/replication/logical/decode.c6
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/logical/reorderbuffer.c3
-rw-r--r--src/backend/replication/logical/replication_identifier.c898
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/utils/cache/syscache.c23
-rw-r--r--src/backend/utils/misc/guc.c1
-rw-r--r--src/bin/initdb/initdb.c1
-rw-r--r--src/bin/pg_resetxlog/pg_resetxlog.c2
-rw-r--r--src/include/access/xact.h9
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/access/xlogdefs.h6
-rw-r--r--src/include/catalog/indexing.h6
-rw-r--r--src/include/catalog/pg_proc.h21
-rw-r--r--src/include/catalog/pg_replication_identifier.h75
-rw-r--r--src/include/replication/logical.h6
-rw-r--r--src/include/replication/reorderbuffer.h10
-rw-r--r--src/include/replication/replication_identifier.h45
-rw-r--r--src/include/utils/syscache.h2
-rw-r--r--src/test/regress/expected/rules.out5
-rw-r--r--src/test/regress/expected/sanity_check.out1
28 files changed, 1222 insertions, 22 deletions
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 2caf5a07b6..7e7e40eabe 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -26,9 +26,12 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
{
int i;
TransactionId *subxacts;
+ SharedInvalidationMessage *msgs;
subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+ msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
+
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
if (xlrec->nrels > 0)
@@ -50,9 +53,6 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
}
if (xlrec->nmsgs > 0)
{
- SharedInvalidationMessage *msgs;
-
- msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
@@ -81,6 +81,17 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
appendStringInfo(buf, " unknown id %d", msg->id);
}
}
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ xl_xact_origin *origin = (xl_xact_origin *) &(msgs[xlrec->nmsgs]);
+
+ appendStringInfo(buf, " origin %u, lsn %X/%X, at %s",
+ origin->origin_node_id,
+ (uint32)(origin->origin_lsn >> 32),
+ (uint32)origin->origin_lsn,
+ timestamptz_to_str(origin->origin_timestamp));
+ }
+
}
static void
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 9140b3a77c..02e163a651 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -38,8 +38,10 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/replication_identifier.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1076,11 +1078,13 @@ RecordTransactionCommit(void)
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval ||
+ forceSyncCommit || replication_origin_id != InvalidRepNodeId)
{
- XLogRecData rdata[4];
+ XLogRecData rdata[5];
int lastrdata = 0;
xl_xact_commit xlrec;
+ xl_xact_origin origin;
/*
* Set flags required for recovery processing of commits.
@@ -1128,6 +1132,21 @@ RecordTransactionCommit(void)
rdata[3].buffer = InvalidBuffer;
lastrdata = 3;
}
+ /* dump transaction origin information */
+ if (replication_origin_id != InvalidRepNodeId)
+ {
+ Assert(replication_origin_lsn != InvalidXLogRecPtr);
+ xlrec.xinfo |= XACT_CONTAINS_ORIGIN;
+ origin.origin_node_id = replication_origin_id;
+ origin.origin_lsn = replication_origin_lsn;
+ origin.origin_timestamp = replication_origin_timestamp;
+
+ rdata[lastrdata].next = &(rdata[4]);
+ rdata[4].data = (char *) &origin;
+ rdata[4].len = sizeof(xl_xact_origin);
+ rdata[4].buffer = InvalidBuffer;
+ lastrdata = 4;
+ }
rdata[lastrdata].next = NULL;
(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
@@ -1158,13 +1177,21 @@ RecordTransactionCommit(void)
}
}
+ /* record plain commit ts if not replaying remote actions */
+ if (replication_origin_id == InvalidRepNodeId)
+ replication_origin_timestamp = xactStopTimestamp;
+ else
+ AdvanceCachedReplicationIdentifier(replication_origin_lsn, XactLastRecEnd);
+
/*
* We don't need to log the commit timestamp separately since the commit
* record logged above has all the necessary action to set the timestamp
* again.
*/
TransactionTreeSetCommitTimestamp(xid, nchildren, children,
- xactStopTimestamp, 0, false);
+ replication_origin_timestamp,
+ replication_origin_id,
+ false);
/*
* Check if we want to commit asynchronously. We can allow the XLOG flush
@@ -1248,7 +1275,6 @@ RecordTransactionCommit(void)
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4675,10 +4701,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
SharedInvalidationMessage *inval_msgs, int nmsgs,
RelFileNode *xnodes, int nrels,
Oid dbId, Oid tsId,
- uint32 xinfo)
+ uint32 xinfo,
+ xl_xact_origin *origin)
{
TransactionId max_xid;
int i;
+ RepNodeId origin_node_id = InvalidRepNodeId;
max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
@@ -4698,9 +4726,26 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
LWLockRelease(XidGenLock);
}
+ Assert(!!(xinfo & XACT_CONTAINS_ORIGIN) == (origin != NULL));
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ origin_node_id = origin->origin_node_id;
+ commit_time = origin->origin_timestamp;
+ }
+
/* Set the transaction commit time */
TransactionTreeSetCommitTimestamp(xid, nsubxacts, sub_xids,
- commit_time, 0, false);
+ commit_time,
+ origin_node_id, false);
+
+ if (xinfo & XACT_CONTAINS_ORIGIN)
+ {
+ /* recover apply progress */
+ AdvanceReplicationIdentifier(origin->origin_node_id,
+ origin->origin_lsn,
+ lsn);
+ }
if (standbyState == STANDBY_DISABLED)
{
@@ -4815,19 +4860,23 @@ xact_redo_commit(xl_xact_commit *xlrec,
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
-
+ xl_xact_origin *origin = NULL;
/* subxid array follows relfilenodes */
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+ if (xlrec->xinfo & XACT_CONTAINS_ORIGIN)
+ origin = (xl_xact_origin *) &(inval_msgs[xlrec->nmsgs]);
+
xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
- xlrec->xinfo);
+ xlrec->xinfo,
+ origin);
}
/*
@@ -4843,7 +4892,8 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
NULL, 0, /* relfilenodes */
InvalidOid, /* dbId */
InvalidOid, /* tsId */
- 0); /* xinfo */
+ 0, /* xinfo */
+ NULL /* origin */);
}
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7a7d306df8..235fe75ca7 100755
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "replication/logical.h"
+#include "replication/replication_identifier.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -1055,6 +1056,7 @@ begin:;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
+ rechdr->xl_origin_id = replication_origin_id;
rechdr->xl_prev = InvalidXLogRecPtr;
COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
@@ -6361,6 +6363,11 @@ StartupXLOG(void)
StartupMultiXact();
/*
+ * Recover knowledge about replay progress of known replication partners.
+ */
+ StartupReplicationIdentifier(checkPoint.redo);
+
+ /*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
* the unlogged LSN counter can be reset too.
@@ -8449,6 +8456,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSnapBuild();
CheckpointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
+ CheckPointReplicationIdentifier(checkPointRedo);
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index a974bd5260..3a20672399 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
- pg_foreign_table.h \
+ pg_foreign_table.h pg_replication_identifier.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
toasting.h indexing.h \
)
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index 7719798457..d2ea0d55e3 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
@@ -275,7 +276,8 @@ IsSharedRelation(Oid relationId)
relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId ||
- relationId == DbRoleSettingRelationId)
+ relationId == DbRoleSettingRelationId ||
+ relationId == ReplicationIdentifierRelationId)
return true;
/* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId ||
@@ -291,7 +293,9 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId ||
- relationId == DbRoleSettingDatidRolidIndexId)
+ relationId == DbRoleSettingDatidRolidIndexId ||
+ relationId == ReplicationLocalIdentIndex ||
+ relationId == ReplicationExternalIdentIndex)
return true;
/* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 49aae10aea..ed13748c3c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -720,6 +720,13 @@ CREATE VIEW pg_user_mappings AS
REVOKE ALL on pg_user_mapping FROM public;
+
+CREATE VIEW pg_replication_identifier_progress AS
+ SELECT *
+ FROM pg_get_replication_identifier_progress();
+
+REVOKE ALL on pg_user_mapping FROM public;
+
--
-- We have a few function definitions in here, too.
-- At some point there might be enough to justify breaking them out into
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 6fae2781ca..f24dbbe297 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,8 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o replication_identifier.o \
+ snapbuild.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 655fecabd4..50ccef9ba7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -496,7 +496,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- commit_time);
+ commit_time, buf->record.xl_origin_id);
}
/*
@@ -540,6 +540,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -580,6 +581,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
data = (char *) &xlhdr->header;
@@ -634,6 +636,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
@@ -689,6 +692,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = r->xl_origin_id;
memcpy(&change->tp.relnode, &xlrec->node, sizeof(RelFileNode));
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 10041cd3aa..3c1d65afbd 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -71,6 +71,10 @@ LogicalDecodingSlot *MyLogicalDecodingSlot = NULL;
/* user settable parameters */
int max_logical_slots = 0; /* the maximum number of logical slots */
+RepNodeId replication_origin_id = InvalidRepNodeId; /* assumed identity */
+XLogRecPtr replication_origin_lsn;
+TimestampTz replication_origin_timestamp;
+
static void LogicalSlotKill(int code, Datum arg);
/* persistency functions */
@@ -809,7 +813,8 @@ StartupLogicalDecoding(XLogRecPtr checkPointRedo)
/* one of our own directories */
if (strcmp(logical_de->d_name, "snapshots") == 0 ||
- strcmp(logical_de->d_name, "mappings") == 0)
+ strcmp(logical_de->d_name, "mappings") == 0 ||
+ strcmp(logical_de->d_name, "checkpoints") == 0)
continue;
/* we crashed while a slot was being setup or deleted, clean up */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 05a5020af4..979fa23af7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1261,7 +1261,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time)
+ TimestampTz commit_time, RepNodeId origin)
{
ReorderBufferTXN *txn;
ReorderBufferIterTXNState *iterstate = NULL;
@@ -1282,6 +1282,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
+ txn->origin_id = origin;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/replication/logical/replication_identifier.c b/src/backend/replication/logical/replication_identifier.c
new file mode 100644
index 0000000000..67684b2854
--- /dev/null
+++ b/src/backend/replication/logical/replication_identifier.c
@@ -0,0 +1,898 @@
+/*-------------------------------------------------------------------------
+ *
+ * replication_identifier.c
+ * Logical Replication Node Identifier and replication progress persistency
+ * support.
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/replication_identifier.c
+ *
+ */
+
+#include "postgres.h"
+
+#include <unistd.h>
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/replication_identifier.h"
+#include "replication/logical.h"
+
+#include "storage/fd.h"
+#include "storage/copydir.h"
+
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+ /*
+ * Local identifier for the remote node.
+ */
+ RepNodeId local_identifier;
+
+ /*
+ * Location of the latest commit from the remote side.
+ */
+ XLogRecPtr remote_lsn;
+
+ /*
+ * Remember the local lsn of the commit record so we can XLogFlush() to it
+ * during a checkpoint so we know the commit record actually is safe on
+ * disk.
+ */
+ XLogRecPtr local_lsn;
+} ReplicationState;
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_logical_slots.
+ * XXX: Should we use a separate variable to size this than max_logical_slots?
+ */
+static ReplicationState *ReplicationStates;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepNodeId.
+ */
+static ReplicationState *local_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC (uint32)0x1257DADE
+
+/* XXX: move to c.h? */
+#ifndef UINT16_MAX
+#define UINT16_MAX ((1<<16) - 1)
+#else
+#if UINT16_MAX != ((1<<16) - 1)
+#error "uh, wrong UINT16_MAX?"
+#endif
+#endif
+
+/*
+ * Check for a persistent repication identifier identified by remotesysid,
+ * remotetli, remotedb, riname, rilocaldb.
+ *
+ * Returns InvalidOid if the node isn't known yet.
+ */
+RepNodeId
+GetReplicationIdentifier(char *riname, bool missing_ok)
+{
+ Form_pg_replication_identifier ident;
+ Oid riident = InvalidOid;
+ HeapTuple tuple;
+ Datum riname_d;
+
+ riname_d = CStringGetTextDatum(riname);
+
+ tuple = SearchSysCache1(REPLIDREMOTE, riname_d);
+ if (HeapTupleIsValid(tuple))
+ {
+ ident = (Form_pg_replication_identifier)GETSTRUCT(tuple);
+ riident = ident->riident;
+ ReleaseSysCache(tuple);
+ }
+ else if (!missing_ok)
+ elog(ERROR, "cache lookup failed for replication identifier named %s",
+ riname);
+
+ return riident;
+}
+
+/*
+ * Create a persistent replication identifier.
+ *
+ * Needs to be called in a transaction.
+ */
+RepNodeId
+CreateReplicationIdentifier(char *riname)
+{
+ Oid riident;
+ HeapTuple tuple = NULL;
+ Relation rel;
+ Datum riname_d;
+ SnapshotData SnapshotDirty;
+ SysScanDesc scan;
+ ScanKeyData key;
+
+ riname_d = CStringGetTextDatum(riname);
+
+ Assert(IsTransactionState());
+
+ /*
+ * We need the numeric replication identifiers to be 16bit wide, so we
+ * cannot rely on the normal oid allocation. So we simply scan
+ * pg_replication_identifier for the first unused id. That's not
+ * particularly efficient, but this should be an fairly infrequent
+ * operation - we can easily spend a bit more code when it turns out it
+ * should be faster.
+ *
+ * We handle concurrency by taking an exclusive lock (allowing reads!)
+ * over the table for the duration of the search. Because we use a "dirty
+ * snapshot" we can read rows that other in-progress sessions have
+ * written, even though they would be invisible with normal snapshots. Due
+ * to the exclusive lock there's no danger that new rows can appear while
+ * we're checking.
+ */
+ InitDirtySnapshot(SnapshotDirty);
+
+ rel = heap_open(ReplicationIdentifierRelationId, ExclusiveLock);
+
+ for (riident = InvalidOid + 1; riident <= UINT16_MAX; riident++)
+ {
+ bool nulls[Natts_pg_replication_identifier];
+ Datum values[Natts_pg_replication_identifier];
+ bool collides;
+ CHECK_FOR_INTERRUPTS();
+
+ ScanKeyInit(&key,
+ Anum_pg_replication_riident,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(riident));
+
+ scan = systable_beginscan(rel, ReplicationLocalIdentIndex,
+ true /* indexOK */,
+ &SnapshotDirty,
+ 1, &key);
+
+ collides = HeapTupleIsValid(systable_getnext(scan));
+
+ systable_endscan(scan);
+
+ if (!collides)
+ {
+ /*
+ * Ok, found an unused riident, insert the new row and do a CCI,
+ * so our callers can look it up if they want to.
+ */
+ memset(&nulls, 0, sizeof(nulls));
+
+ values[Anum_pg_replication_riident -1] = ObjectIdGetDatum(riident);
+ values[Anum_pg_replication_riname - 1] = riname_d;
+
+ tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+ simple_heap_insert(rel, tuple);
+ CatalogUpdateIndexes(rel, tuple);
+ CommandCounterIncrement();
+ break;
+ }
+ }
+
+ /* now release lock again, */
+ heap_close(rel, ExclusiveLock);
+
+ if (tuple == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no free replication id could be found")));
+
+ heap_freetuple(tuple);
+ return riident;
+}
+
+
+/*
+ * Lookup pg_replication_identifier tuple via its riident.
+ *
+ * The result needs to be ReleaseSysCache'ed and is an invalid HeapTuple if
+ * the lookup failed.
+ */
+HeapTuple
+GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok)
+{
+ HeapTuple tuple;
+
+ Assert(OidIsValid((Oid) riident));
+ Assert(riident < UINT16_MAX);
+ tuple = SearchSysCache1(REPLIDIDENT,
+ ObjectIdGetDatum((Oid) riident));
+
+ if (!HeapTupleIsValid(tuple) && !missing_ok)
+ elog(ERROR, "cache lookup failed for replication identifier id: %u",
+ riident);
+
+ return tuple;
+}
+
+static void
+CheckReplicationIdentifierPrerequisites(bool check_slots)
+{
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("only superusers can query or manipulate replication identifiers")));
+
+ if (check_slots && max_logical_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot query or manipulate replication identifiers when max_logical_slots = 0")));
+
+}
+
+Datum
+pg_replication_identifier_get(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepNodeId riident;
+
+ CheckReplicationIdentifierPrerequisites(false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ riident = GetReplicationIdentifier(name, true);
+
+ pfree(name);
+
+ if (OidIsValid(riident))
+ PG_RETURN_OID(riident);
+ PG_RETURN_NULL();
+}
+
+
+Datum
+pg_replication_identifier_create(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepNodeId riident;
+
+ CheckReplicationIdentifierPrerequisites(false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ riident = CreateReplicationIdentifier(name);
+
+ pfree(name);
+
+ PG_RETURN_OID(riident);
+}
+
+Datum
+pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepNodeId origin;
+
+ CheckReplicationIdentifierPrerequisites(true);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ origin = GetReplicationIdentifier(name, false);
+ SetupCachedReplicationIdentifier(origin);
+
+ replication_origin_id = origin;
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS)
+{
+ CheckReplicationIdentifierPrerequisites(true);
+
+ TeardownCachedReplicationIdentifier();
+
+ replication_origin_id = InvalidRepNodeId;
+
+ PG_RETURN_VOID();
+}
+
+
+Datum
+pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS)
+{
+ text *location = PG_GETARG_TEXT_P(0);
+ char *locationstr;
+ uint32 hi,
+ lo;
+
+ CheckReplicationIdentifierPrerequisites(true);
+
+ locationstr = text_to_cstring(location);
+
+ if (sscanf(locationstr, "%X/%X", &hi, &lo) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse transaction log location \"%s\"",
+ locationstr)));
+
+ if (local_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("need to setup the origin id first")));
+
+ replication_origin_lsn = ((uint64) hi) << 32 | lo;
+ replication_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+
+ pfree(locationstr);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+pg_get_replication_identifier_progress(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int i;
+#define REPLICATION_IDENTIFIER_PROGRESS_COLS 4
+
+ CheckReplicationIdentifierPrerequisites(true);
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ if (tupdesc->natts != REPLICATION_IDENTIFIER_PROGRESS_COLS)
+ elog(ERROR, "wrong function definition");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ /*
+ * Iterate through all possible ReplicationStates, display if they are
+ * filled. Note that we do not take any locks, so slightly corrupted/out
+ * of date values are a possibility.
+ */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ ReplicationState *state;
+ Datum values[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+ bool nulls[REPLICATION_IDENTIFIER_PROGRESS_COLS];
+ char location[MAXFNAMELEN];
+ HeapTuple ri;
+ Form_pg_replication_identifier ric;
+
+ state = &ReplicationStates[i];
+
+ /* unused slot, nothing to display */
+ if (state->local_identifier == InvalidRepNodeId)
+ continue;
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, 0, sizeof(nulls));
+
+ values[ 0] = ObjectIdGetDatum(state->local_identifier);
+
+ ri = GetReplicationInfoByIdentifier(state->local_identifier, true);
+
+ /*
+ * We're not preventing the identifier to be dropped concurrently, so
+ * silently accept that it might be gone.
+ */
+ if (!OidIsValid(ri))
+ continue;
+
+ ric = (Form_pg_replication_identifier) GETSTRUCT(ri);
+ values[ 1] = PointerGetDatum(&ric->riname);
+
+ snprintf(location, sizeof(location), "%X/%X",
+ (uint32) (state->remote_lsn >> 32), (uint32) state->remote_lsn);
+ values[ 2] = CStringGetTextDatum(location);
+ snprintf(location, sizeof(location), "%X/%X",
+ (uint32) (state->local_lsn >> 32), (uint32) state->local_lsn);
+ values[ 3] = CStringGetTextDatum(location);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+
+ /* free the strings we just allocated */
+ pfree(DatumGetPointer(values[ 2]));
+ pfree(DatumGetPointer(values[ 3]));
+ ReleaseSysCache(ri);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+#undef REPLICATION_IDENTIFIER_PROGRESS_COLS
+
+ return (Datum) 0;
+}
+
+Size
+ReplicationIdentifierShmemSize(void)
+{
+ Size size = 0;
+
+ /*
+ * FIXME: max_logical_slots is the wrong thing to use here, here we keep
+ * the replay state of *remote* transactions.
+ */
+ if (max_logical_slots == 0)
+ return size;
+
+ size = add_size(size,
+ mul_size(max_logical_slots, sizeof(ReplicationState)));
+ return size;
+}
+
+void
+ReplicationIdentifierShmemInit(void)
+{
+ bool found;
+
+ if (max_logical_slots == 0)
+ return;
+
+ ReplicationStates = (ReplicationState *)
+ ShmemInitStruct("ReplicationIdentifierState",
+ ReplicationIdentifierShmemSize(),
+ &found);
+
+ if (!found)
+ {
+ MemSet(ReplicationStates, 0, ReplicationIdentifierShmemSize());
+ }
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of replication identifier's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+-------------------------+-------------------------+-----+
+ * | MAGIC | struct ReplicationState | struct ReplicationState | ... | EOF
+ * +-------+-------------------------+-------------------------+-----+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStates. Note that the maximum number of ReplicationStates is
+ * determined by max_logical_slots.
+ *
+ * FIXME: Add a CRC32 to the end.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char tmppath[MAXPGPATH];
+ char path[MAXPGPATH];
+ int fd;
+ int tmpfd;
+ int i;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+
+ if (max_logical_slots == 0)
+ return;
+
+ /*
+ * Write to a filename a LSN of the checkpoint's REDO pointer, so we can
+ * deal with the checkpoint failing after
+ * CheckPointReplicationIdentifier() finishing.
+ */
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+ sprintf(tmppath, "pg_llog/checkpoints/%X-%X.ckpt.tmp",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ /* check whether file already exists */
+ fd = OpenTransientFile(path,
+ O_RDONLY | PG_BINARY,
+ 0);
+
+ /* usual case, no checkpoint performed yet */
+ if (fd < 0 && errno == ENOENT)
+ ;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not check replication state checkpoint \"%s\": %m",
+ path)));
+ /* already checkpointed before crash during a checkpoint or so */
+ else
+ {
+ CloseTransientFile(fd);
+ return;
+ }
+
+ /* make sure no old temp file is remaining */
+ if (unlink(tmppath) < 0 && errno != ENOENT)
+ ereport(PANIC, (errmsg("failed while unlinking %s", path)));
+
+ /*
+ * no other backend can perform this at the same time, we're protected by
+ * CheckpointLock.
+ */
+ tmpfd = OpenTransientFile(tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (tmpfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+
+ /* write magic */
+ if ((write(tmpfd, &magic, sizeof(magic))) !=
+ sizeof(magic))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ /* write actual data */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ ReplicationState local_state;
+
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId)
+ continue;
+
+ local_state.local_identifier = ReplicationStates[i].local_identifier;
+ local_state.remote_lsn = ReplicationStates[i].remote_lsn;
+ local_state.local_lsn = InvalidXLogRecPtr;
+
+ /* make sure we only write out a commit that's persistent */
+ XLogFlush(ReplicationStates[i].local_lsn);
+
+ if ((write(tmpfd, &local_state, sizeof(ReplicationState))) !=
+ sizeof(ReplicationState))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+ }
+
+ /* fsync the file */
+ if (pg_fsync(tmpfd) != 0)
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync replication identifier checkpoint \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(tmpfd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename replication identifier checkpoint from \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ fsync_fname("pg_llog/checkpoints", true);
+ fsync_fname(path, false);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationIdentifier.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationIdentifier(XLogRecPtr ckpt)
+{
+ char path[MAXPGPATH];
+ int fd;
+ int readBytes;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ int last_state = 0;
+
+ /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+ static bool already_started = false;
+ Assert(!already_started);
+ already_started = true;
+#endif
+
+ if (max_logical_slots == 0)
+ return;
+
+ elog(LOG, "starting up replication identifier with ckpt at %X/%X",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ sprintf(path, "pg_llog/checkpoints/%X-%X.ckpt",
+ (uint32)(ckpt >> 32), (uint32)ckpt);
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * might have had max_logical_slots == 0 last run, or we just brought up a
+ * standby.
+ */
+ if (fd < 0 && errno == ENOENT)
+ return;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open replication state checkpoint \"%s\": %m",
+ path)));
+
+ /* verify magic, thats written even if nothing was active */
+ readBytes = read(fd, &magic, sizeof(magic));
+ if (readBytes != sizeof(magic))
+ ereport(PANIC,
+ (errmsg("could not read replication state checkpoint magic \"%s\": %m",
+ path)));
+
+ if (magic != REPLICATION_STATE_MAGIC)
+ ereport(PANIC,
+ (errmsg("replication checkpoint has wrong magic %u instead of %u",
+ magic, REPLICATION_STATE_MAGIC)));
+
+ /* recover individual states, until there are no more to be found */
+ while (true)
+ {
+ ReplicationState local_state;
+ readBytes = read(fd, &local_state, sizeof(local_state));
+
+ /* no further data */
+ if (readBytes == 0)
+ break;
+
+ if (readBytes < 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read replication checkpoint file \"%s\": %m",
+ path)));
+ }
+
+ if (readBytes != sizeof(local_state))
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read replication checkpoint file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(local_state))));
+ }
+
+ if (last_state == max_logical_slots)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found, increase max_logical_slots")));
+
+ /* copy data shared memory */
+ ReplicationStates[last_state++] = local_state;
+
+ elog(LOG, "recovered replication state of node %u to %X/%X",
+ local_state.local_identifier,
+ (uint32)(local_state.remote_lsn >> 32),
+ (uint32)local_state.remote_lsn);
+ }
+
+ CloseTransientFile(fd);
+}
+
+/*
+ * Tell the replication identifier machinery that a commit from 'node' that
+ * originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replication_origin_lsn and replication_origin_id that ensures we
+ * won't loose knowledge about that after a crash if the the transaction had a
+ * persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ */
+void
+AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ int i;
+ int free_slot = -1;
+ ReplicationState *replication_state = NULL;
+
+ /*
+ * XXX: should we restore into a hashtable and dump into shmem only after
+ * recovery finished?
+ */
+
+ /* check whether slot already exists */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ /* ok, found slot */
+ replication_state = &ReplicationStates[i];
+ break;
+ }
+
+ if (replication_state == NULL && free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ /* initialize new slot */
+ else if (replication_state == NULL)
+ {
+ replication_state = &ReplicationStates[free_slot];
+ Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ replication_state->local_identifier = node;
+ }
+
+ /*
+ * Due to - harmless - race conditions during a checkpoint we could see
+ * values here that are older than the ones we already have in
+ * memory. Don't overwrite those.
+ */
+ if (replication_state->remote_lsn < remote_commit)
+ replication_state->remote_lsn = remote_commit;
+ if (replication_state->local_lsn < local_commit)
+ replication_state->local_lsn = local_commit;
+}
+
+
+/*
+ * Setup a replication identifier in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * AdvanceCachedReplicationIdentifier().
+ *
+ * Obviously only one such cached identifier can exist per process and the
+ * current cached value can only be set again after the prvious value is torn
+ * down with TeardownCachedReplicationIdentifier.
+ */
+void
+SetupCachedReplicationIdentifier(RepNodeId node)
+{
+ int i;
+ int free_slot = -1;
+
+ Assert(max_logical_slots > 0);
+
+ if (local_replication_state != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot setup replication origin when one is already setup")));
+
+ /*
+ * Search for either an existing slot for that identifier or a free one we
+ * can use.
+ */
+ for (i = 0; i < max_logical_slots; i++)
+ {
+ /* remember where to insert if necessary */
+ if (ReplicationStates[i].local_identifier == InvalidRepNodeId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (ReplicationStates[i].local_identifier != node)
+ continue;
+
+ local_replication_state = &ReplicationStates[i];
+ }
+
+
+ if (local_replication_state == NULL && free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found for %u, increase max_logical_slots",
+ node)));
+ else if (local_replication_state == NULL)
+ {
+ local_replication_state = &ReplicationStates[free_slot];
+ local_replication_state->local_identifier = node;
+ Assert(local_replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(local_replication_state->local_lsn == InvalidXLogRecPtr);
+ }
+}
+
+/*
+ * Make currently cached replication identifier unavailable so a new one can
+ * be setup with SetupCachedReplicationIdentifier().
+ *
+ * This function may only be called if a previous identifier was setup with
+ * SetupCachedReplicationIdentifier().
+ */
+void
+TeardownCachedReplicationIdentifier(void)
+{
+ Assert(max_logical_slots != 0);
+
+ if (local_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot setup replication origin when one is already setup")));
+
+ local_replication_state = NULL;
+}
+
+/*
+ * Do the same work AdvanceReplicationIdentifier() does, just on a pre-cached
+ * identifier. This is noticeably cheaper if you only ever work on a single
+ * replication identifier.
+ */
+void
+AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit)
+{
+ Assert(local_replication_state != NULL);
+ if (local_replication_state->local_lsn < local_commit)
+ local_replication_state->local_lsn = local_commit;
+ if (local_replication_state->remote_lsn < remote_commit)
+ local_replication_state->remote_lsn = remote_commit;
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup & cached replication identifier.
+ */
+XLogRecPtr
+RemoteCommitFromCachedReplicationIdentifier(void)
+{
+ Assert(local_replication_state != NULL);
+ return local_replication_state->remote_lsn;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index ed2d69f1d2..868ebb421a 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
#include "replication/logical.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/replication_identifier.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
@@ -129,6 +130,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, LogicalDecodingShmemSize());
+ size = add_size(size, ReplicationIdentifierShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -237,6 +239,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
CheckpointerShmemInit();
AutoVacuumShmemInit();
LogicalDecodingShmemInit();
+ ReplicationIdentifierShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index e9bdfeae21..727405d317 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_identifier.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_ts_config.h"
@@ -620,6 +621,28 @@ static const struct cachedesc cacheinfo[] = {
},
128
},
+ {ReplicationIdentifierRelationId, /* REPLIDIDENT */
+ ReplicationLocalIdentIndex,
+ 1,
+ {
+ Anum_pg_replication_riident,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
+ {ReplicationIdentifierRelationId, /* REPLIDREMOTE */
+ ReplicationExternalIdentIndex,
+ 1,
+ {
+ Anum_pg_replication_riname,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index a4146f0229..1aacd6b848 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -474,7 +474,6 @@ static bool data_checksums;
static int wal_segment_size;
static bool integer_datetimes;
static int effective_io_concurrency;
-
/* should be static, but commands/variable.c needs to get at this */
char *role_string;
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index b7424e7f2f..f1e991a12e 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -202,6 +202,7 @@ const char *subdirs[] = {
"pg_stat",
"pg_stat_tmp",
"pg_llog",
+ "pg_llog/checkpoints",
"pg_llog/snapshots",
"pg_llog/mappings"
};
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index f1b5d6d473..dbc5b74cf8 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -55,6 +55,7 @@
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "common/fe_memutils.h"
+#include "replication/logical.h"
extern int optind;
extern char *optarg;
@@ -970,6 +971,7 @@ WriteEmptyXLOG(void)
record->xl_len = sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+ record->xl_origin_id = InvalidRepNodeId;
memcpy(XLogRecGetData(record), &ControlFile.checkPointCopy,
sizeof(CheckPoint));
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 9632378865..9592a2d041 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,6 +146,13 @@ typedef struct xl_xact_commit
/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
} xl_xact_commit;
+typedef struct xl_xact_origin
+{
+ XLogRecPtr origin_lsn;
+ RepNodeId origin_node_id;
+ TimestampTz origin_timestamp;
+} xl_xact_origin;
+
#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
/*
@@ -158,7 +165,7 @@ typedef struct xl_xact_commit
*/
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE 0x01
#define XACT_COMPLETION_FORCE_SYNC_COMMIT 0x02
-
+#define XACT_CONTAINS_ORIGIN 0x04
/* Access macros for above flags */
#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
#define XactCompletionForceSyncCommit(xinfo) (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 7415a261bb..54e5dcaa91 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -48,6 +48,7 @@ typedef struct XLogRecord
/* 2 bytes of padding here, initialize to zero */
XLogRecPtr xl_prev; /* ptr to previous record in log */
pg_crc32 xl_crc; /* CRC for this record */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
/* If MAXALIGN==8, there are 4 wasted bytes here */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index bca166ebdc..60fcc31fc8 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -45,6 +45,12 @@ typedef uint64 XLogSegNo;
typedef uint32 TimeLineID;
/*
+ * Denotes the node on which the action causing a wal record to be logged
+ * originated on.
+ */
+typedef uint16 RepNodeId;
+
+/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
* it is a win to use it in all cases where we sync on each write(). We could
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 4860e98ca5..fd6f88ec93 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -313,6 +313,12 @@ DECLARE_UNIQUE_INDEX(pg_extension_name_index, 3081, on pg_extension using btree(
DECLARE_UNIQUE_INDEX(pg_range_rngtypid_index, 3542, on pg_range using btree(rngtypid oid_ops));
#define RangeTypidIndexId 3542
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riiident_index, 3195, on pg_replication_identifier using btree(riident oid_ops));
+#define ReplicationLocalIdentIndex 3195
+
+DECLARE_UNIQUE_INDEX(pg_replication_identifier_riname_index, 3196, on pg_replication_identifier using btree(riname varchar_pattern_ops));
+#define ReplicationExternalIdentIndex 3196
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 815c8ae896..fcce1fe5b2 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4778,6 +4778,27 @@ DESCR("stop logical replication");
/* event triggers */
DATA(insert OID = 3566 ( pg_event_trigger_dropped_objects PGNSP PGUID 12 10 100 0 0 f f f f t t s 0 0 2249 "" "{26,26,23,25,25,25,25}" "{o,o,o,o,o,o,o}" "{classid, objid, objsubid, object_type, schema_name, object_name, object_identity}" _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
DESCR("list objects dropped by the current command");
+
+/* replication_identifier.h */
+DATA(insert OID = 3197 ( pg_replication_identifier_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_create _null_ _null_ _null_ ));
+DESCR("create local replication identifier for the passed external one");
+
+DATA(insert OID = 3198 ( pg_replication_identifier_get PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_get _null_ _null_ _null_ ));
+DESCR("translate the external node identifier to a local one");
+
+DATA(insert OID = 3199 ( pg_replication_identifier_setup_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_replaying_from _null_ _null_ _null_ ));
+DESCR("setup from which node we are replaying transactions from currently");
+
+DATA(insert OID = 3200 ( pg_replication_identifier_reset_replaying_from PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ pg_replication_identifier_reset_replaying_from _null_ _null_ _null_ ));
+DESCR("reset replay mode");
+
+DATA(insert OID = 3201 ( pg_replication_identifier_setup_tx_origin PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 1184" _null_ _null_ _null_ _null_ pg_replication_identifier_setup_tx_origin _null_ _null_ _null_ ));
+DESCR("setup transaction timestamp and origin lsn");
+
+DATA(insert OID = 3202 ( pg_get_replication_identifier_progress PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,25,25}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ pg_get_replication_identifier_progress _null_ _null_ _null_ ));
+DESCR("replication identifier progress");
+
+
/*
* Symbolic values for provolatile column: these indicate whether the result
* of a function is dependent *only* on the values of its explicit arguments,
diff --git a/src/include/catalog/pg_replication_identifier.h b/src/include/catalog/pg_replication_identifier.h
new file mode 100644
index 0000000000..918adff78a
--- /dev/null
+++ b/src/include/catalog/pg_replication_identifier.h
@@ -0,0 +1,75 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_identifier.h
+ * Persistent Replication Node Identifiers
+ *
+ * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_identifier.h
+ *
+ * NOTES
+ * the genbki.pl script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_IDENTIFIER_H
+#define PG_REPLICATION_IDENTIFIER_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ * pg_replication_identifier. cpp turns this into
+ * typedef struct FormData_pg_replication_identifier
+ * ----------------
+ */
+#define ReplicationIdentifierRelationId 3465
+
+CATALOG(pg_replication_identifier,3465) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+ /*
+ * locally known identifier that gets included into wal.
+ *
+ * This should never leave the system.
+ *
+ * Needs to fit into a uint16, so we don't waste too much space in WAL
+ * records. For this reason we don't use a normal Oid column here, since
+ * we need to handle allocation of new values manually.
+ */
+ Oid riident;
+
+ /*
+ * Variable-length fields start here, but we allow direct access to
+ * riname.
+ */
+
+ /* external, free-format, identifier */
+ text riname;
+#ifdef CATALOG_VARLEN /* further variable-length fields */
+#endif
+} FormData_pg_replication_identifier;
+
+/* ----------------
+ * Form_pg_extension corresponds to a pointer to a tuple with
+ * the format of pg_extension relation.
+ * ----------------
+ */
+typedef FormData_pg_replication_identifier *Form_pg_replication_identifier;
+
+/* ----------------
+ * compiler constants for pg_replication_identifier
+ * ----------------
+ */
+
+#define Natts_pg_replication_identifier 2
+#define Anum_pg_replication_riident 1
+#define Anum_pg_replication_riname 2
+
+/* ----------------
+ * pg_replication_identifier has no initial contents
+ * ----------------
+ */
+
+#endif /* PG_REPLICTION_IDENTIFIER_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 273b98faa3..eae7efcf85 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -160,6 +160,12 @@ typedef struct LogicalDecodingContext
/* GUCs */
extern PGDLLIMPORT int max_logical_slots;
+#define InvalidRepNodeId 0
+extern PGDLLIMPORT RepNodeId replication_origin_id;
+extern PGDLLIMPORT XLogRecPtr replication_origin_lsn;
+extern PGDLLIMPORT TimestampTz replication_origin_timestamp;
+
+
extern Size LogicalDecodingShmemSize(void);
extern void LogicalDecodingShmemInit(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4181ccfa9e..ff5d29c144 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -60,6 +60,8 @@ typedef struct ReorderBufferChange
int action_internal;
};
+ RepNodeId origin_id;
+
/*
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
@@ -148,6 +150,12 @@ typedef struct ReorderBufferTXN
*/
XLogRecPtr restart_decoding_lsn;
+ /* origin of the change that caused this transaction */
+ RepNodeId origin_id;
+
+ /* did the TX have catalog changes */
+ bool does_timetravel;
+
/*
* Commit time, only known when we read the actual commit record.
*/
@@ -320,7 +328,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time);
+ TimestampTz commit_time, RepNodeId origin_id);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/replication/replication_identifier.h b/src/include/replication/replication_identifier.h
new file mode 100644
index 0000000000..866009d2d1
--- /dev/null
+++ b/src/include/replication/replication_identifier.h
@@ -0,0 +1,45 @@
+/*-------------------------------------------------------------------------
+ * replication_identifier.h
+ * XXX
+ *
+ * Copyright (c) 2013, PostgreSQL Global Development Group
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPLICATION_IDENTIFIER_H
+#define REPLICATION_IDENTIFIER_H
+
+#include "catalog/pg_replication_identifier.h"
+#include "replication/logical.h"
+
+/* API for querying & manipulating replication identifiers */
+extern RepNodeId GetReplicationIdentifier(char *name, bool missing_ok);
+extern RepNodeId CreateReplicationIdentifier(char *name);
+extern HeapTuple GetReplicationInfoByIdentifier(RepNodeId riident, bool missing_ok);
+
+extern void AdvanceReplicationIdentifier(RepNodeId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void AdvanceCachedReplicationIdentifier(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void SetupCachedReplicationIdentifier(RepNodeId node);
+extern void TeardownCachedReplicationIdentifier(void);
+extern XLogRecPtr RemoteCommitFromCachedReplicationIdentifier(void);
+
+/* crash recovery support */
+extern void CheckPointReplicationIdentifier(XLogRecPtr ckpt);
+extern void StartupReplicationIdentifier(XLogRecPtr ckpt);
+
+/* internals */
+extern Size ReplicationIdentifierShmemSize(void);
+extern void ReplicationIdentifierShmemInit(void);
+
+/* SQL callable functions */
+extern Datum pg_replication_identifier_get(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_create(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_reset_replaying_from(PG_FUNCTION_ARGS);
+extern Datum pg_replication_identifier_setup_tx_origin(PG_FUNCTION_ARGS);
+extern Datum pg_get_replication_identifier_progress(PG_FUNCTION_ARGS);
+
+#endif
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index e41b3d2e38..357f344915 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
RANGETYPE,
RELNAMENSP,
RELOID,
+ REPLIDIDENT,
+ REPLIDREMOTE,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 72b0f48952..446bbfe3d5 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1367,6 +1367,11 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_identifier_progress| SELECT pg_get_replication_identifier_progress.local_id,
+ pg_get_replication_identifier_progress.external_id,
+ pg_get_replication_identifier_progress.remote_lsn,
+ pg_get_replication_identifier_progress.local_lsn
+ FROM pg_get_replication_identifier_progress() pg_get_replication_identifier_progress(local_id, external_id, remote_lsn, local_lsn);
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,
pg_authid.rolinherit,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index a62a3e3ebc..d5e40d6b6d 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -120,6 +120,7 @@ pg_opfamily|t
pg_pltemplate|t
pg_proc|t
pg_range|t
+pg_replication_identifier|t
pg_rewrite|t
pg_seclabel|t
pg_shdepend|t