summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2022-08-30 03:21:41 +0000
committerAmit Kapila2022-08-30 03:21:41 +0000
commitf6c5edb8abcac04eb3eac6da356e59d399b2bcef (patch)
tree059817b92102142963428da9f31de02a15d4e349
parent865424627db638acdbe4b5d0384d0b9cd34838a5 (diff)
Drop replication origin slots before tablesync worker exits.
Currently, the replication origin tracking of the tablesync worker is dropped by the apply worker. So, there will be a small lag between the tablesync worker exit and its origin tracking got removed. In the meantime, new tablesync workers can be launched and will try to set up a new origin tracking. This can lead the system to reach max configured limit (max_replication_slots) even if the user has configured the max limit considering the number of tablesync workers required in the system. We decided not to back-patch as this can occur in very narrow circumstances and users have to option to increase the configured limit by increasing max_replication_slots. Reported-by: Hubert Depesz Lubaczewski Author: Ajin Cherian Reviwed-by: Masahiko Sawada, Peter Smith, Hou Zhijie, Amit Kapila Discussion: https://postgr.es/m/20220714115155.GA5439@depesz.com
-rw-r--r--src/backend/commands/subscriptioncmds.c25
-rw-r--r--src/backend/replication/logical/tablesync.c50
2 files changed, 41 insertions, 34 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 670b219c8d..f87796e5af 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
logicalrep_worker_stop(sub->oid, relid);
/*
- * For READY state, we would have already dropped the
- * tablesync origin.
+ * For READY state and SYNCDONE state, we would have already
+ * dropped the tablesync origin.
*/
- if (state != SUBREL_STATE_READY)
+ if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
{
char originname[NAMEDATALEN];
@@ -930,11 +930,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
* Drop the tablesync's origin tracking if exists.
*
* It is possible that the origin is not yet created for
- * tablesync worker, this can happen for the states before
- * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
- * concurrently try to drop the origin and by this time
- * the origin might be already removed. For these reasons,
- * passing missing_ok = true.
+ * tablesync worker so passing missing_ok = true. This can
+ * happen for the states before SUBREL_STATE_FINISHEDCOPY.
*/
ReplicationOriginNameForTablesync(sub->oid, relid, originname,
sizeof(originname));
@@ -1507,13 +1504,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/*
* Drop the tablesync's origin tracking if exists.
*
+ * For SYNCDONE/READY states, the tablesync origin tracking is known
+ * to have already been dropped by the tablesync worker.
+ *
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY.
*/
- ReplicationOriginNameForTablesync(subid, relid, originname,
- sizeof(originname));
- replorigin_drop_by_name(originname, true, false);
+ if (rstate->state != SUBREL_STATE_SYNCDONE)
+ {
+ ReplicationOriginNameForTablesync(subid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+ }
}
/* Clean up dependencies */
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d37d8a0d74..91ba49a14b 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
{
TimeLineID tli;
char syncslotname[NAMEDATALEN] = {0};
+ char originname[NAMEDATALEN] = {0};
MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
MyLogicalRepWorker->relstate_lsn = current_lsn;
@@ -310,6 +311,30 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relstate_lsn);
/*
+ * Cleanup the tablesync origin tracking.
+ *
+ * Resetting the origin session removes the ownership of the slot.
+ * This is needed to allow the origin to be dropped.
+ */
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+ replorigin_session_reset();
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+
+ /*
+ * We expect that origin must be present. The concurrent operations
+ * that remove origin like a refresh for the subscription take an
+ * access exclusive lock on pg_subscription which prevent the previous
+ * operation to update the rel state to SUBREL_STATE_SYNCDONE to
+ * succeed.
+ */
+ replorigin_drop_by_name(originname, false, false);
+
+ /*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot.
*/
@@ -318,7 +343,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
/*
* Cleanup the tablesync slot.
*
- * This has to be done after updating the state because otherwise if
+ * This has to be done after the data changes because otherwise if
* there is an error while doing the database operations we won't be
* able to rollback dropped slot.
*/
@@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
if (current_lsn >= rstate->lsn)
{
- char originname[NAMEDATALEN];
-
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
@@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
- * Remove the tablesync origin tracking if exists.
- *
- * The normal case origin drop is done here instead of in the
- * process_syncing_tables_for_sync function because we don't
- * allow to drop the origin till the process owning the origin
- * is alive.
- *
- * There is a chance that the user is concurrently performing
- * refresh for the subscription where we remove the table
- * state and its origin and by this time the origin might be
- * already removed. So passing missing_ok = true.
- */
- ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
- rstate->relid,
- originname,
- sizeof(originname));
- replorigin_drop_by_name(originname, true, false);
-
- /*
- * Update the state to READY only after the origin cleanup.
+ * Update the state to READY.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,