Drop replication origin slots before tablesync worker exits.
authorAmit Kapila <akapila@postgresql.org>
Tue, 30 Aug 2022 03:21:41 +0000 (08:51 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 30 Aug 2022 03:21:41 +0000 (08:51 +0530)
Currently, the replication origin tracking of the tablesync worker is
dropped by the apply worker. So, there will be a small lag between the
tablesync worker exit and its origin tracking got removed. In the
meantime, new tablesync workers can be launched and will try to set up
a new origin tracking. This can lead the system to reach max configured
limit (max_replication_slots) even if the user has configured the max
limit considering the number of tablesync workers required in the system.

We decided not to back-patch as this can occur in very narrow
circumstances and users have to option to increase the configured limit by
increasing max_replication_slots.

Reported-by: Hubert Depesz Lubaczewski
Author: Ajin Cherian
Reviwed-by: Masahiko Sawada, Peter Smith, Hou Zhijie, Amit Kapila
Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c

index 670b219c8d4c1c8e36883e2a668e1c27eaa2c612..f87796e5afefde307ab8af3805c6a94f278843ff 100644 (file)
@@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                logicalrep_worker_stop(sub->oid, relid);
 
                /*
-                * For READY state, we would have already dropped the
-                * tablesync origin.
+                * For READY state and SYNCDONE state, we would have already
+                * dropped the tablesync origin.
                 */
-               if (state != SUBREL_STATE_READY)
+               if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
                {
                    char        originname[NAMEDATALEN];
 
@@ -930,11 +930,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                     * Drop the tablesync's origin tracking if exists.
                     *
                     * It is possible that the origin is not yet created for
-                    * tablesync worker, this can happen for the states before
-                    * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
-                    * concurrently try to drop the origin and by this time
-                    * the origin might be already removed. For these reasons,
-                    * passing missing_ok = true.
+                    * tablesync worker so passing missing_ok = true. This can
+                    * happen for the states before SUBREL_STATE_FINISHEDCOPY.
                     */
                    ReplicationOriginNameForTablesync(sub->oid, relid, originname,
                                                      sizeof(originname));
@@ -1507,13 +1504,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        /*
         * Drop the tablesync's origin tracking if exists.
         *
+        * For SYNCDONE/READY states, the tablesync origin tracking is known
+        * to have already been dropped by the tablesync worker.
+        *
         * It is possible that the origin is not yet created for tablesync
         * worker so passing missing_ok = true. This can happen for the states
         * before SUBREL_STATE_FINISHEDCOPY.
         */
-       ReplicationOriginNameForTablesync(subid, relid, originname,
-                                         sizeof(originname));
-       replorigin_drop_by_name(originname, true, false);
+       if (rstate->state != SUBREL_STATE_SYNCDONE)
+       {
+           ReplicationOriginNameForTablesync(subid, relid, originname,
+                                             sizeof(originname));
+           replorigin_drop_by_name(originname, true, false);
+       }
    }
 
    /* Clean up dependencies */
index d37d8a0d74a450b7e294cecacbd8832c8cf9bfd0..91ba49a14bd4ee51b618008fb9b38cfbfc51819a 100644 (file)
@@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
    {
        TimeLineID  tli;
        char        syncslotname[NAMEDATALEN] = {0};
+       char        originname[NAMEDATALEN] = {0};
 
        MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
        MyLogicalRepWorker->relstate_lsn = current_lsn;
@@ -309,6 +310,30 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                   MyLogicalRepWorker->relstate,
                                   MyLogicalRepWorker->relstate_lsn);
 
+       /*
+        * Cleanup the tablesync origin tracking.
+        *
+        * Resetting the origin session removes the ownership of the slot.
+        * This is needed to allow the origin to be dropped.
+        */
+       ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+                                         MyLogicalRepWorker->relid,
+                                         originname,
+                                         sizeof(originname));
+       replorigin_session_reset();
+       replorigin_session_origin = InvalidRepOriginId;
+       replorigin_session_origin_lsn = InvalidXLogRecPtr;
+       replorigin_session_origin_timestamp = 0;
+
+       /*
+        * We expect that origin must be present. The concurrent operations
+        * that remove origin like a refresh for the subscription take an
+        * access exclusive lock on pg_subscription which prevent the previous
+        * operation to update the rel state to SUBREL_STATE_SYNCDONE to
+        * succeed.
+        */
+       replorigin_drop_by_name(originname, false, false);
+
        /*
         * End streaming so that LogRepWorkerWalRcvConn can be used to drop
         * the slot.
@@ -318,7 +343,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
        /*
         * Cleanup the tablesync slot.
         *
-        * This has to be done after updating the state because otherwise if
+        * This has to be done after the data changes because otherwise if
         * there is an error while doing the database operations we won't be
         * able to rollback dropped slot.
         */
@@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
             */
            if (current_lsn >= rstate->lsn)
            {
-               char        originname[NAMEDATALEN];
-
                rstate->state = SUBREL_STATE_READY;
                rstate->lsn = current_lsn;
                if (!started_tx)
@@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                }
 
                /*
-                * Remove the tablesync origin tracking if exists.
-                *
-                * The normal case origin drop is done here instead of in the
-                * process_syncing_tables_for_sync function because we don't
-                * allow to drop the origin till the process owning the origin
-                * is alive.
-                *
-                * There is a chance that the user is concurrently performing
-                * refresh for the subscription where we remove the table
-                * state and its origin and by this time the origin might be
-                * already removed. So passing missing_ok = true.
-                */
-               ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-                                                 rstate->relid,
-                                                 originname,
-                                                 sizeof(originname));
-               replorigin_drop_by_name(originname, true, false);
-
-               /*
-                * Update the state to READY only after the origin cleanup.
+                * Update the state to READY.
                 */
                UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                           rstate->relid, rstate->state,