summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c5
-rw-r--r--src/backend/replication/logical/origin.c22
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c27
4 files changed, 52 insertions, 4 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0b775b1e985..da9c359af10 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
PQserverVersion(conn->streamConn) >= 150000)
appendStringInfoString(&cmd, ", two_phase 'on'");
+ if (options->proto.logical.origin &&
+ PQserverVersion(conn->streamConn) >= 160000)
+ appendStringInfo(&cmd, ", origin '%s'",
+ options->proto.logical.origin);
+
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 21937ab2d31..c72ad6b93de 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -77,6 +77,7 @@
#include "access/xloginsert.h"
#include "catalog/catalog.h"
#include "catalog/indexing.h"
+#include "catalog/pg_subscription.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
@@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
}
+/*
+ * IsReservedOriginName
+ * True iff name is either "none" or "any".
+ */
+static bool
+IsReservedOriginName(const char *name)
+{
+ return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
+ (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
+}
+
/* ---------------------------------------------------------------------------
* Functions for working with replication origins themselves.
* ---------------------------------------------------------------------------
@@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
- /* Replication origins "pg_xxx" are reserved for internal use */
- if (IsReservedName(name))
+ /*
+ * Replication origins "any and "none" are reserved for system options.
+ * The origins "pg_xxx" are reserved for internal use.
+ */
+ if (IsReservedName(name) || IsReservedOriginName(name))
ereport(ERROR,
(errcode(ERRCODE_RESERVED_NAME),
errmsg("replication origin name \"%s\" is reserved",
name),
- errdetail("Origin names starting with \"pg_\" are reserved.")));
+ errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
+ LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
/*
* If built with appropriate switch, whine when regression-testing
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 38e3b1c1b3c..5f8c5417630 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3077,6 +3077,7 @@ maybe_reread_subscription(void)
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
newsub->stream != MySubscription->stream ||
+ strcmp(newsub->origin, MySubscription->origin) != 0 ||
newsub->owner != MySubscription->owner ||
!equal(newsub->publications, MySubscription->publications))
{
@@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg)
options.proto.logical.binary = MySubscription->binary;
options.proto.logical.streaming = MySubscription->stream;
options.proto.logical.twophase = false;
+ options.proto.logical.origin = pstrdup(MySubscription->origin);
if (!am_tablesync_worker())
{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ba8a24d0999..a3c1ba8a402 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -16,6 +16,7 @@
#include "catalog/partition.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
+#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "executor/executor.h"
#include "fmgr.h"
@@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
static bool publications_valid;
static bool in_streaming;
+static bool publish_no_origin;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
@@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool messages_option_given = false;
bool streaming_given = false;
bool two_phase_option_given = false;
+ bool origin_option_given = false;
data->binary = false;
data->streaming = false;
@@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data)
data->two_phase = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "origin") == 0)
+ {
+ if (origin_option_given)
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"));
+ origin_option_given = true;
+
+ data->origin = defGetString(defel);
+ if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0)
+ publish_no_origin = true;
+ else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0)
+ publish_no_origin = false;
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("unrecognized origin value: \"%s\"", data->origin));
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
/*
- * Currently we always forward.
+ * Return true if the data is associated with an origin and the user has
+ * requested the changes that don't have an origin, false otherwise.
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id)
{
+ if (publish_no_origin && origin_id != InvalidRepOriginId)
+ return true;
+
return false;
}