diff options
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 5 | ||||
| -rw-r--r-- | src/backend/replication/logical/origin.c | 22 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 27 |
4 files changed, 52 insertions, 4 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0b775b1e985..da9c359af10 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -451,6 +451,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + if (options->proto.logical.origin && + PQserverVersion(conn->streamConn) >= 160000) + appendStringInfo(&cmd, ", origin '%s'", + options->proto.logical.origin); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 21937ab2d31..c72ad6b93de 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -77,6 +77,7 @@ #include "access/xloginsert.h" #include "catalog/catalog.h" #include "catalog/indexing.h" +#include "catalog/pg_subscription.h" #include "funcapi.h" #include "miscadmin.h" #include "nodes/execnodes.h" @@ -195,6 +196,17 @@ replorigin_check_prerequisites(bool check_slots, bool recoveryOK) } +/* + * IsReservedOriginName + * True iff name is either "none" or "any". + */ +static bool +IsReservedOriginName(const char *name) +{ + return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) || + (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0)); +} + /* --------------------------------------------------------------------------- * Functions for working with replication origins themselves. * --------------------------------------------------------------------------- @@ -1244,13 +1256,17 @@ pg_replication_origin_create(PG_FUNCTION_ARGS) name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); - /* Replication origins "pg_xxx" are reserved for internal use */ - if (IsReservedName(name)) + /* + * Replication origins "any and "none" are reserved for system options. + * The origins "pg_xxx" are reserved for internal use. + */ + if (IsReservedName(name) || IsReservedOriginName(name)) ereport(ERROR, (errcode(ERRCODE_RESERVED_NAME), errmsg("replication origin name \"%s\" is reserved", name), - errdetail("Origin names starting with \"pg_\" are reserved."))); + errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.", + LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE))); /* * If built with appropriate switch, whine when regression-testing diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 38e3b1c1b3c..5f8c5417630 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3077,6 +3077,7 @@ maybe_reread_subscription(void) strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || + strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) { @@ -3758,6 +3759,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.origin = pstrdup(MySubscription->origin); if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index ba8a24d0999..a3c1ba8a402 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -16,6 +16,7 @@ #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_subscription.h" #include "commands/defrem.h" #include "executor/executor.h" #include "fmgr.h" @@ -79,6 +80,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; static bool in_streaming; +static bool publish_no_origin; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, @@ -285,6 +287,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool origin_option_given = false; data->binary = false; data->streaming = false; @@ -378,6 +381,24 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "origin") == 0) + { + if (origin_option_given) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options")); + origin_option_given = true; + + data->origin = defGetString(defel); + if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_NONE) == 0) + publish_no_origin = true; + else if (pg_strcasecmp(data->origin, LOGICALREP_ORIGIN_ANY) == 0) + publish_no_origin = false; + else + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized origin value: \"%s\"", data->origin)); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1696,12 +1717,16 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Currently we always forward. + * Return true if the data is associated with an origin and the user has + * requested the changes that don't have an origin, false otherwise. */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { + if (publish_no_origin && origin_id != InvalidRepOriginId) + return true; + return false; } |
