Allow setting failover property in the replication command.
authorAmit Kapila <akapila@postgresql.org>
Mon, 29 Jan 2024 03:40:00 +0000 (09:10 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 29 Jan 2024 04:07:23 +0000 (09:37 +0530)
This commit implements a new replication command called
ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named
walrcv_alter_slot. Additionally, the CREATE_REPLICATION_SLOT command has
been extended to support the failover option.

These new additions allow the modification of the failover property of a
replication slot on the publisher. A subsequent commit will make use of
these commands in subscription commands and will add the tests as well to
cover the functionality added/changed by this commit.

Author: Hou Zhijie, Shveta Malik
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda, Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com

13 files changed:
doc/src/sgml/protocol.sgml
src/backend/commands/subscriptioncmds.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/tablesync.c
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/slot.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/nodes/replnodes.h
src/include/replication/slot.h
src/include/replication/walreceiver.h
src/tools/pgindent/typedefs.list

index 6c3e8a631d786f1e113ddb0c1d60c96752746647..bb4fef1f519415dc5cc0f0ba01ea4cc66d7e41c2 100644 (file)
@@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If true, the slot is enabled to be synced to the standbys.
+          The default is false.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
       <para>
@@ -2124,6 +2134,46 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
      </listitem>
     </varlistentry>
 
+    <varlistentry id="protocol-replication-alter-replication-slot" xreflabel="ALTER_REPLICATION_SLOT">
+     <term><literal>ALTER_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ( <replaceable class="parameter">option</replaceable> [, ...] )
+      <indexterm><primary>ALTER_REPLICATION_SLOT</primary></indexterm>
+     </term>
+     <listitem>
+      <para>
+       Change the definition of a replication slot.
+       See <xref linkend="streaming-replication-slots"/> for more about
+       replication slots. This command is currently only supported for logical
+       replication slots.
+      </para>
+
+      <variablelist>
+       <varlistentry>
+        <term><replaceable class="parameter">slot_name</replaceable></term>
+        <listitem>
+         <para>
+          The name of the slot to alter. Must be a valid replication slot
+          name (see <xref linkend="streaming-replication-slots-manipulation"/>).
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+
+      <para>The following option is supported:</para>
+
+      <variablelist>
+       <varlistentry>
+        <term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If true, the slot is enabled to be synced to the standbys.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+
+     </listitem>
+    </varlistentry>
+
     <varlistentry id="protocol-replication-read-replication-slot">
      <term><literal>READ_REPLICATION_SLOT</literal> <replaceable class="parameter">slot_name</replaceable>
       <indexterm><primary>READ_REPLICATION_SLOT</primary></indexterm>
index 75e6cd8ae3c7aba84cbf3c030bef80b882c1a6ee..eaf2ec3b362640544b9b52571f412db531198192 100644 (file)
@@ -807,7 +807,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                                        twophase_enabled = true;
 
                                walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
-                                                                  CRS_NOEXPORT_SNAPSHOT, NULL);
+                                                                  false, CRS_NOEXPORT_SNAPSHOT, NULL);
 
                                if (twophase_enabled)
                                        UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
index 77669074e826b28879f2dc2b4d319bfc0ee56014..2439733b55bc3c5ac582153ee0f48bf26ed2a003 100644 (file)
@@ -73,8 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
                                                                  const char *slotname,
                                                                  bool temporary,
                                                                  bool two_phase,
+                                                                 bool failover,
                                                                  CRSSnapshotAction snapshot_action,
                                                                  XLogRecPtr *lsn);
+static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+                                                               bool failover);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
                                                                           const char *query,
@@ -95,6 +98,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
        .walrcv_receive = libpqrcv_receive,
        .walrcv_send = libpqrcv_send,
        .walrcv_create_slot = libpqrcv_create_slot,
+       .walrcv_alter_slot = libpqrcv_alter_slot,
        .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
        .walrcv_exec = libpqrcv_exec,
        .walrcv_disconnect = libpqrcv_disconnect
@@ -938,8 +942,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
  */
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
-                                        bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
-                                        XLogRecPtr *lsn)
+                                        bool temporary, bool two_phase, bool failover,
+                                        CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
 {
        PGresult   *res;
        StringInfoData cmd;
@@ -969,6 +973,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
                                appendStringInfoChar(&cmd, ' ');
                }
 
+               if (failover)
+               {
+                       appendStringInfoString(&cmd, "FAILOVER");
+                       if (use_new_options_syntax)
+                               appendStringInfoString(&cmd, ", ");
+                       else
+                               appendStringInfoChar(&cmd, ' ');
+               }
+
                if (use_new_options_syntax)
                {
                        switch (snapshot_action)
@@ -1037,6 +1050,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
        return snapshot;
 }
 
+/*
+ * Change the definition of the replication slot.
+ */
+static void
+libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+                                       bool failover)
+{
+       StringInfoData cmd;
+       PGresult   *res;
+
+       initStringInfo(&cmd);
+       appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
+                                        quote_identifier(slotname),
+                                        failover ? "true" : "false");
+
+       res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+       pfree(cmd.data);
+
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               ereport(ERROR,
+                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                errmsg("could not alter replication slot \"%s\": %s",
+                                               slotname, pchomp(PQerrorMessage(conn->streamConn)))));
+
+       PQclear(res);
+}
+
 /*
  * Return PID of remote backend process.
  */
index 06d5b3df33a29127b84c5f36d3e366ea4d56a046..4207b9356c55eb2047f174eeec656c971a996a28 100644 (file)
@@ -1430,6 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         */
        walrcv_create_slot(LogRepWorkerWalRcvConn,
                                           slotname, false /* permanent */ , false /* two_phase */ ,
+                                          false,
                                           CRS_USE_SNAPSHOT, origin_startpos);
 
        /*
index 95e126eb4dcb39aa6e55cce7e0016a6bfadebbe7..7474f5bd6713df6c7b2973f0ea1adb26892809e1 100644 (file)
@@ -64,6 +64,7 @@ Node *replication_parse_result;
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
 %token K_DROP_REPLICATION_SLOT
+%token K_ALTER_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
 %token K_WAIT
 %token K_TIMELINE
@@ -80,8 +81,9 @@ Node *replication_parse_result;
 
 %type <node>   command
 %type <node>   base_backup start_replication start_logical_replication
-                               create_replication_slot drop_replication_slot identify_system
-                               read_replication_slot timeline_history show upload_manifest
+                               create_replication_slot drop_replication_slot
+                               alter_replication_slot identify_system read_replication_slot
+                               timeline_history show upload_manifest
 %type <list>   generic_option_list
 %type <defelt> generic_option
 %type <uintval>        opt_timeline
@@ -112,6 +114,7 @@ command:
                        | start_logical_replication
                        | create_replication_slot
                        | drop_replication_slot
+                       | alter_replication_slot
                        | read_replication_slot
                        | timeline_history
                        | show
@@ -259,6 +262,18 @@ drop_replication_slot:
                                }
                        ;
 
+/* ALTER_REPLICATION_SLOT slot (options) */
+alter_replication_slot:
+                       K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
+                               {
+                                       AlterReplicationSlotCmd *cmd;
+                                       cmd = makeNode(AlterReplicationSlotCmd);
+                                       cmd->slotname = $2;
+                                       cmd->options = $4;
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
 /*
  * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
  */
@@ -410,6 +425,7 @@ ident_or_keyword:
                        | K_START_REPLICATION                   { $$ = "start_replication"; }
                        | K_CREATE_REPLICATION_SLOT     { $$ = "create_replication_slot"; }
                        | K_DROP_REPLICATION_SLOT               { $$ = "drop_replication_slot"; }
+                       | K_ALTER_REPLICATION_SLOT              { $$ = "alter_replication_slot"; }
                        | K_TIMELINE_HISTORY                    { $$ = "timeline_history"; }
                        | K_WAIT                                                { $$ = "wait"; }
                        | K_TIMELINE                                    { $$ = "timeline"; }
index 6fa625617bdf51b17164596d9bdf099ef759a460..e7def800655f3c5b1a2637f4e9303764dfa9b417 100644 (file)
@@ -125,6 +125,7 @@ TIMELINE                    { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
 CREATE_REPLICATION_SLOT                { return K_CREATE_REPLICATION_SLOT; }
 DROP_REPLICATION_SLOT          { return K_DROP_REPLICATION_SLOT; }
+ALTER_REPLICATION_SLOT         { return K_ALTER_REPLICATION_SLOT; }
 TIMELINE_HISTORY       { return K_TIMELINE_HISTORY; }
 PHYSICAL                       { return K_PHYSICAL; }
 RESERVE_WAL                    { return K_RESERVE_WAL; }
@@ -302,6 +303,7 @@ replication_scanner_is_replication_command(void)
                case K_START_REPLICATION:
                case K_CREATE_REPLICATION_SLOT:
                case K_DROP_REPLICATION_SLOT:
+               case K_ALTER_REPLICATION_SLOT:
                case K_READ_REPLICATION_SLOT:
                case K_TIMELINE_HISTORY:
                case K_UPLOAD_MANIFEST:
index 02a14ec210ea83b9e6cfb169510c03cbcfb1e901..f2781d0455ac9ce3f6c80dd9fe4209086e537158 100644 (file)
@@ -683,6 +683,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
        ReplicationSlotDropAcquired();
 }
 
+/*
+ * Change the definition of the slot identified by the specified name.
+ */
+void
+ReplicationSlotAlter(const char *name, bool failover)
+{
+       Assert(MyReplicationSlot == NULL);
+
+       ReplicationSlotAcquire(name, false);
+
+       if (SlotIsPhysical(MyReplicationSlot))
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("cannot use %s with a physical replication slot",
+                                          "ALTER_REPLICATION_SLOT"));
+
+       SpinLockAcquire(&MyReplicationSlot->mutex);
+       MyReplicationSlot->data.failover = failover;
+       SpinLockRelease(&MyReplicationSlot->mutex);
+
+       ReplicationSlotMarkDirty();
+       ReplicationSlotSave();
+       ReplicationSlotRelease();
+}
+
 /*
  * Permanently drop the currently acquired replication slot.
  */
index 728059518e1e5be3936f9d75ccdaf723d1193cd5..e29a6196a3eb8c047fe41b62ab70e39c61b89615 100644 (file)
@@ -387,7 +387,7 @@ WalReceiverMain(void)
                                         "pg_walreceiver_%lld",
                                         (long long int) walrcv_get_backend_pid(wrconn));
 
-                       walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
+                       walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
 
                        SpinLockAcquire(&walrcv->mutex);
                        strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
index aa80f3de20f5107d8060c0951f48d7cc6070aae2..77c8baa32a4c08c3370854ee7870a6c080895812 100644 (file)
@@ -1126,12 +1126,13 @@ static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
                                                   bool *reserve_wal,
                                                   CRSSnapshotAction *snapshot_action,
-                                                  bool *two_phase)
+                                                  bool *two_phase, bool *failover)
 {
        ListCell   *lc;
        bool            snapshot_action_given = false;
        bool            reserve_wal_given = false;
        bool            two_phase_given = false;
+       bool            failover_given = false;
 
        /* Parse options */
        foreach(lc, cmd->options)
@@ -1181,6 +1182,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
                        two_phase_given = true;
                        *two_phase = defGetBoolean(defel);
                }
+               else if (strcmp(defel->defname, "failover") == 0)
+               {
+                       if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       failover_given = true;
+                       *failover = defGetBoolean(defel);
+               }
                else
                        elog(ERROR, "unrecognized option: %s", defel->defname);
        }
@@ -1197,6 +1207,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
        char       *slot_name;
        bool            reserve_wal = false;
        bool            two_phase = false;
+       bool            failover = false;
        CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
        DestReceiver *dest;
        TupOutputState *tstate;
@@ -1206,7 +1217,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
        Assert(!MyReplicationSlot);
 
-       parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
+       parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
+                                                          &failover);
 
        if (cmd->kind == REPLICATION_KIND_PHYSICAL)
        {
@@ -1243,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
                 */
                ReplicationSlotCreate(cmd->slotname, true,
                                                          cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-                                                         two_phase, false);
+                                                         two_phase, failover);
 
                /*
                 * Do options check early so that we can bail before calling the
@@ -1398,6 +1410,43 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
        ReplicationSlotDrop(cmd->slotname, !cmd->wait);
 }
 
+/*
+ * Process extra options given to ALTER_REPLICATION_SLOT.
+ */
+static void
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+{
+       bool            failover_given = false;
+
+       /* Parse options */
+       foreach_ptr(DefElem, defel, cmd->options)
+       {
+               if (strcmp(defel->defname, "failover") == 0)
+               {
+                       if (failover_given)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       failover_given = true;
+                       *failover = defGetBoolean(defel);
+               }
+               else
+                       elog(ERROR, "unrecognized option: %s", defel->defname);
+       }
+}
+
+/*
+ * Change the definition of a replication slot.
+ */
+static void
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
+{
+       bool            failover = false;
+
+       ParseAlterReplSlotOptions(cmd, &failover);
+       ReplicationSlotAlter(cmd->slotname, failover);
+}
+
 /*
  * Load previously initiated logical slot and prepare for sending data (via
  * WalSndLoop).
@@ -1971,6 +2020,13 @@ exec_replication_command(const char *cmd_string)
                        EndReplicationCommand(cmdtag);
                        break;
 
+               case T_AlterReplicationSlotCmd:
+                       cmdtag = "ALTER_REPLICATION_SLOT";
+                       set_ps_display(cmdtag);
+                       AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
+                       EndReplicationCommand(cmdtag);
+                       break;
+
                case T_StartReplicationCmd:
                        {
                                StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
index af0a333f1a84178516765e15be02f9d8cf02fae7..ed23333e9286aae2ddca87f4c0ba6fdabebc3388 100644 (file)
@@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd
 } DropReplicationSlotCmd;
 
 
+/* ----------------------
+ *             ALTER_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct AlterReplicationSlotCmd
+{
+       NodeTag         type;
+       char       *slotname;
+       List       *options;
+} AlterReplicationSlotCmd;
+
+
 /* ----------------------
  *             START_REPLICATION command
  * ----------------------
index db9bb222661cec483a1dfa17715f3983aa9b6d8e..da4c7764921480a6203998b592f2aa2a04d6de1d 100644 (file)
@@ -227,6 +227,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
                                                                  bool two_phase, bool failover);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
+extern void ReplicationSlotAlter(const char *name, bool failover);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
index 0899891cdb86faba39c46b0f5d324bf505b7e195..f566a99ba161273b5537d11840d5dbd8a1f4d7ce 100644 (file)
@@ -355,9 +355,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
                                                                                const char *slotname,
                                                                                bool temporary,
                                                                                bool two_phase,
+                                                                               bool failover,
                                                                                CRSSnapshotAction snapshot_action,
                                                                                XLogRecPtr *lsn);
 
+/*
+ * walrcv_alter_slot_fn
+ *
+ * Change the definition of a replication slot. Currently, it only supports
+ * changing the failover property of the slot.
+ */
+typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
+                                                                         const char *slotname,
+                                                                         bool failover);
+
 /*
  * walrcv_get_backend_pid_fn
  *
@@ -399,6 +410,7 @@ typedef struct WalReceiverFunctionsType
        walrcv_receive_fn walrcv_receive;
        walrcv_send_fn walrcv_send;
        walrcv_create_slot_fn walrcv_create_slot;
+       walrcv_alter_slot_fn walrcv_alter_slot;
        walrcv_get_backend_pid_fn walrcv_get_backend_pid;
        walrcv_exec_fn walrcv_exec;
        walrcv_disconnect_fn walrcv_disconnect;
@@ -428,8 +440,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
        WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
        WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
-       WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
+       WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
+#define walrcv_alter_slot(conn, slotname, failover) \
+       WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
 #define walrcv_get_backend_pid(conn) \
        WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
index dc3b0ef87107c2129c98fcd5b1033548d2756b70..90b37b919c2ef682f452cdde97bf0abd75e0e492 100644 (file)
@@ -85,6 +85,7 @@ AlterOwnerStmt
 AlterPolicyStmt
 AlterPublicationAction
 AlterPublicationStmt
+AlterReplicationSlotCmd
 AlterRoleSetStmt
 AlterRoleStmt
 AlterSeqStmt
@@ -3879,6 +3880,7 @@ varattrib_1b_e
 varattrib_4b
 vbits
 verifier_context
+walrcv_alter_slot_fn
 walrcv_check_conninfo_fn
 walrcv_connect_fn
 walrcv_create_slot_fn