diff options
68 files changed, 2591 insertions, 438 deletions
diff --git a/contrib/basic_archive/basic_archive.c b/contrib/basic_archive/basic_archive.c index 4a8b8c7ac29..8fc633d2cbf 100644 --- a/contrib/basic_archive/basic_archive.c +++ b/contrib/basic_archive/basic_archive.c @@ -65,7 +65,7 @@ void _PG_init(void) { DefineCustomStringVariable("basic_archive.archive_directory", - gettext_noop("Archive file destination directory."), + "Archive file destination directory.", NULL, &archive_directory, "", diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index c459a842fa9..de5bed282f3 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -242,7 +242,7 @@ dblink_get_conn(char *conname_or_str, } PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, - gettext_noop("received message via remote connection")); + "received message via remote connection"); dblink_security_check(conn, NULL, connstr); if (PQclientEncoding(conn) != GetDatabaseEncoding()) @@ -343,7 +343,7 @@ dblink_connect(PG_FUNCTION_ARGS) } PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, - gettext_noop("received message via remote connection")); + "received message via remote connection"); /* check password actually used if not superuser */ dblink_security_check(conn, connname, connstr); diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index e41d47c3bbd..c1ce6f33436 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -626,7 +626,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user) errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, - gettext_noop("received message via remote connection")); + "received message via remote connection"); /* Perform post-connection security checks. */ pgfdw_security_check(keywords, values, user, conn); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 0d23bc1b122..97f547b3cc4 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8084,6 +8084,17 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <row> <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subretaindeadtuples</structfield> <type>bool</type> + </para> + <para> + If true, the information (e.g., dead tuples, commit timestamps, and + origins) on the subscriber that is useful for conflict detection is + retained. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> <structfield>subconninfo</structfield> <type>text</type> </para> <para> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c7acc0f182f..20ccb2d6b54 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4965,6 +4965,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" new setting. This setting has no effect if <varname>primary_conninfo</varname> is not set or the server is not in standby mode. + The name cannot be <literal>pg_conflict_detection</literal> as it is + reserved for the conflict detection slot. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index f5a0e0954a1..de5b5929ee0 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29592,7 +29592,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset </para> <para> Creates a new physical replication slot named - <parameter>slot_name</parameter>. The optional second parameter, + <parameter>slot_name</parameter>. The name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for the + conflict detection slot. The optional second parameter, when <literal>true</literal>, specifies that the <acronym>LSN</acronym> for this replication slot be reserved immediately; otherwise the <acronym>LSN</acronym> is reserved on first connection from a streaming @@ -29636,7 +29638,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <para> Creates a new logical (decoding) replication slot named <parameter>slot_name</parameter> using the output plugin - <parameter>plugin</parameter>. The optional third + <parameter>plugin</parameter>. The name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for + the conflict detection slot. The optional third parameter, <parameter>temporary</parameter>, when set to true, specifies that the slot should not be permanently stored to disk and is only meant for use by the current session. Temporary slots are also @@ -29666,6 +29670,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <para> Copies an existing physical replication slot named <parameter>src_slot_name</parameter> to a physical replication slot named <parameter>dst_slot_name</parameter>. + The new slot name cannot be <literal>pg_conflict_detection</literal>, + as it is reserved for the conflict detection. The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the source slot. <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter> @@ -29688,8 +29694,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset Copies an existing logical replication slot named <parameter>src_slot_name</parameter> to a logical replication slot named <parameter>dst_slot_name</parameter>, optionally changing - the output plugin and persistence. The copied logical slot starts - from the same <acronym>LSN</acronym> as the source logical slot. Both + the output plugin and persistence. The new slot name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for + the conflict detection. The copied logical slot starts from the same + <acronym>LSN</acronym> as the source logical slot. Both <parameter>temporary</parameter> and <parameter>plugin</parameter> are optional; if they are omitted, the values of the source slot are used. The <literal>failover</literal> option of the source logical slot diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index e26f7f59d4a..fcac55aefe6 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1048,28 +1048,28 @@ HINT: To initiate replication, you must manually create the replication slot, e defined) for each publication. <programlisting><![CDATA[ /* pub # */ \dRp+ - Publication p1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text)) + "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text)) - Publication p2 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p2 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t1" - "public.t2" WHERE (e = 99) + "public.t1" + "public.t2" WHERE (e = 99) - Publication p3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t2" WHERE (d = 10) - "public.t3" WHERE (g = 10) + "public.t2" WHERE (d = 10) + "public.t3" WHERE (g = 10) ]]></programlisting></para> <para> @@ -1491,10 +1491,10 @@ Publications: for each publication. <programlisting> /* pub # */ \dRp+ - Publication p1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: "public.t1" (id, a, b, d) </programlisting></para> @@ -2397,6 +2397,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER </para> <para> + <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link> + must be set to at least 1 when <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + is enabled for any subscription. + </para> + + <para> <link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link> must be set to at least the number of subscriptions (for leader apply workers), plus some reserve for the table synchronization workers and @@ -2532,6 +2538,22 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER dependencies on clusters before version 17.0 will silently be ignored. </para> + <note> + <para> + Commit timestamps and origin data are not preserved during the upgrade. + As a result, even if + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + is enabled, the upgraded subscriber may be unable to detect conflicts or + log relevant commit timestamps and origins when applying changes from the + publisher occurred before the upgrade. Additionally, immediately after the + upgrade, the vacuum may remove the deleted rows that are required for + conflict detection. This can affect the changes that were not replicated + before the upgrade. To ensure consistent conflict tracking, users should + ensure that all potentially conflicting changes are replicated to the + subscriber before initiating the upgrade. + </para> + </note> + <para> There are some prerequisites for <application>pg_upgrade</application> to be able to upgrade the subscriptions. If these are not met an error @@ -2563,6 +2585,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER subscriptions present in the old cluster. </para> </listitem> + <listitem> + <para> + If there are subscriptions with retain_dead_tuples enabled, the reserved + replication slot <quote><literal>pg_conflict_detection</literal></quote> + must not exist on the new cluster. Additionally, the + <link linkend="guc-wal-level"><varname>wal_level</varname></link> on the + new cluster must be set to <literal>replica</literal> or + <literal>logical</literal>. + </para> + </listitem> </itemizedlist> </sect2> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e74b5be1eff..b115884acb3 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2235,6 +2235,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" <para> The name of the slot to create. Must be a valid replication slot name (see <xref linkend="streaming-replication-slots-manipulation"/>). + The name cannot be <literal>pg_conflict_detection</literal> as it + is reserved for the conflict detection. </para> </listitem> </varlistentry> @@ -2653,6 +2655,65 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> </listitem> </varlistentry> + + <varlistentry id="protocol-replication-primary-status-update"> + <term>Primary status update (B)</term> + <listitem> + <variablelist> + <varlistentry> + <term>Byte1('s')</term> + <listitem> + <para> + Identifies the message as a primary status update. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The latest WAL write position on the server. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The oldest transaction ID that is currently in the commit phase on + the server, along with its epoch. The most significant 32 bits are + the epoch. The least significant 32 bits are the transaction ID. + If no transactions are active on the server, this number will be + the next transaction ID to be assigned. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The next transaction ID to be assigned on the server, along with + its epoch. The most significant 32 bits are the epoch. The least + significant 32 bits are the transaction ID. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> </variablelist> <para> @@ -2797,6 +2858,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> </listitem> </varlistentry> + + <varlistentry id="protocol-replication-standby-wal-status-request"> + <term>Request primary status update (F)</term> + <listitem> + <variablelist> + <varlistentry> + <term>Byte1('p')</term> + <listitem> + <para> + Identifies the message as a request for a primary status update. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..d48cdc76bd3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>, <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, - <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and - <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>. + <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, + <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>. Only a superuser can set <literal>password_required = false</literal>. </para> @@ -261,8 +262,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </para> <para> - The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link> - and <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link> + The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, + <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> parameters can only be altered when the subscription is disabled. </para> @@ -285,6 +287,14 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < option is changed from <literal>true</literal> to <literal>false</literal>, the publisher will replicate the transactions again when they are committed. </para> + + <para> + If the <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + option is altered to <literal>false</literal> and no other subscription + has this option enabled, the replication slot named + <quote><literal>pg_conflict_detection</literal></quote>, created to retain + dead tuples for conflict detection, will be dropped. + </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..b8cd15f3280 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -169,7 +169,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl <listitem> <para> Name of the publisher's replication slot to use. The default is - to use the name of the subscription for the slot name. + to use the name of the subscription for the slot name. The name cannot + be <literal>pg_conflict_detection</literal> as it is reserved for the + conflict detection. </para> <para> @@ -435,6 +437,89 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> </listitem> </varlistentry> + + <varlistentry id="sql-createsubscription-params-with-retain-dead-tuples"> + <term><literal>retain_dead_tuples</literal> (<type>boolean</type>)</term> + <listitem> + <para> + Specifies whether the information (e.g., dead tuples, commit + timestamps, and origins) required for conflict detection on the + subscriber is retained. The default is <literal>false</literal>. + If set to <literal>true</literal>, a physical replication slot named + <quote><literal>pg_conflict_detection</literal></quote> will be + created on the subscriber to prevent the conflict information from + being removed. + </para> + + <para> + Note that the information useful for conflict detection is retained + only after the creation of the slot. You can verify the existence of + this slot by querying <link linkend="view-pg-replication-slots">pg_replication_slots</link>. + And even if multiple subscriptions on one node enable this option, + only one replication slot will be created. Also, + <varname>wal_level</varname> must be set to <literal>replica</literal> + or higher to allow the replication slot to be used. + </para> + + <caution> + <para> + Note that the information for conflict detection cannot be purged if + the subscription is disabled; thus, the information will accumulate + until the subscription is enabled. To prevent excessive accumulation, + it is recommended to disable <literal>retain_dead_tuples</literal> + if the subscription will be inactive for an extended period. + </para> + + <para> + Additionally when enabling <literal>retain_dead_tuples</literal> for + conflict detection in logical replication, it is important to design the + replication topology to balance data retention requirements with + overall system performance. This option provides minimal performance + overhead when applied appropriately. The following scenarios illustrate + effective usage patterns when enabling this option. + </para> + + <para> + a. Large Tables with Bidirectional Writes: + For large tables subject to concurrent writes on both publisher and + subscriber nodes, publishers can define row filters when creating + publications to segment data. This allows multiple subscriptions + to replicate exclusive subsets of the table in parallel, optimizing + the throughput. + </para> + + <para> + b. Write-Enabled Subscribers: + If a subscriber node is expected to perform write operations, replication + can be structured using multiple publications and subscriptions. By + distributing tables across these publications, the workload is spread among + several apply workers, improving concurrency and reducing contention. + </para> + + <para> + c. Read-Only Subscribers: + In configurations involving single or multiple publisher nodes + performing concurrent write operations, read-only subscriber nodes may + replicate changes without seeing a performance impact if it does index + scan. However, if the subscriber is impacted due to replication lag or + scan performance (say due to sequential scans), it needs to follow one + of the two previous strategies to distribute the workload on the + subscriber. + </para> + </caution> + + <para> + This option cannot be enabled if the publisher is a physical standby. + </para> + + <para> + Enabling this option ensures retention of information useful for + conflict detection solely for changes occurring locally on the + publisher. For the changes originating from different origins, + reliable conflict detection cannot be guaranteed. + </para> + </listitem> + </varlistentry> </variablelist></para> </listitem> diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 85cbe397cb2..7918176fc58 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact) * starting immediately after the WAL record is inserted could complete * without fsync'ing our state file. (This is essentially the same kind * of race condition as the COMMIT-to-clog-write case that - * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in + * the critical commit section. We need to know about such transactions + * for conflict detection in logical replication. See + * GetOldestActiveTransactionId(true, false) and its use. * * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. @@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, * RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit (q.v. if you change - * this function): in particular, we must set DELAY_CHKPT_START to avoid a + * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a * race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), @@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; - TimestampTz committs = GetCurrentTimestamp(); + TimestampTz committs; bool replorigin; /* @@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid, START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + /* + * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before + * commit time is written. + */ + pg_write_barrier(); + + /* + * Note it is important to set committs value after marking ourselves as + * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because + * we want to ensure all transactions that have acquired commit timestamp + * are finished before we allow the logical replication client to advance + * its xid which is used to hold back dead rows for conflict detection. + * See comments atop worker.c. + */ + committs = GetCurrentTimestamp(); /* * Emit the XLOG commit record. Note that we mark 2PC commits as @@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionIdCommitTree(xid, nchildren, children); /* Checkpoint can proceed now */ - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 41601fcb280..b46e7e9c2a6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1431,10 +1431,22 @@ RecordTransactionCommit(void) * without holding the ProcArrayLock, since we're the only one * modifying it. This makes checkpoint's determination of which xacts * are delaying the checkpoint a bit fuzzy, but it doesn't matter. + * + * Note, it is important to get the commit timestamp after marking the + * transaction in the commit critical section. See + * RecordTransactionCommitPrepared. */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); START_CRIT_SECTION(); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + Assert(xactStopTimestamp == 0); + + /* + * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible + * before commit time is written. + */ + pg_write_barrier(); /* * Insert the commit XLOG record. @@ -1537,7 +1549,7 @@ RecordTransactionCommit(void) */ if (markXidCommitted) { - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8e7827c6ed9..eefffc4277a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7121,7 +7121,7 @@ CreateCheckPoint(int flags) * starting snapshot of locks and transactions. */ if (!shutdown && XLogStandbyInfoActive()) - checkPoint.oldestActiveXid = GetOldestActiveTransactionId(); + checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true); else checkPoint.oldestActiveXid = InvalidTransactionId; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 23878b2dd91..e8f3ba00caa 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4760,7 +4760,7 @@ bool check_primary_slot_name(char **newval, void **extra, GucSource source) { if (*newval && strcmp(*newval, "") != 0 && - !ReplicationSlotValidateName(*newval, WARNING)) + !ReplicationSlotValidateName(*newval, false, WARNING)) return false; return true; diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..63c2992d19f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->retaindeadtuples = subform->subretaindeadtuples; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b2d5332effc..f6eca09ee15 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1386,7 +1386,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subretaindeadtuples, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e23b0de7242..cd6c3684482 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retaindeadtuples; char *origin; XLogRecPtr lsn; } SubOpts; @@ -105,8 +108,10 @@ typedef struct SubOpts static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname); + bool retain_dead_tuples, char *origin, + Oid *subrel_local_oids, int subrel_count, + char *subname); +static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + opts->retaindeadtuples = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) && + strcmp(defel->defname, "retain_dead_tuples") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; + opts->retaindeadtuples = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } + /* Ensure that we can enable retain_dead_tuples */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) opts.slot_name = stmt->subname; @@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, - opts.origin, NULL, 0, stmt->subname); + opts.retaindeadtuples, opts.origin, + NULL, 0, stmt->subname); + + if (opts.retaindeadtuples) + check_pub_dead_tuple_retention(wrconn); /* * Set sync state based on if we were asked to do data copy or @@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); check_publications_origin(wrconn, sub->publications, copy_data, - sub->origin, subrel_local_oids, - subrel_count, sub->name); + sub->retaindeadtuples, sub->origin, + subrel_local_oids, subrel_count, sub->name); /* * Rels that we want to remove from subscription and drop any slots @@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Common checks for altering failover and two_phase options. + * Common checks for altering failover, two_phase, and retain_dead_tuples + * options. */ static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel) { + Assert(strcmp(option, "failover") == 0 || + strcmp(option, "two_phase") == 0 || + strcmp(option, "retain_dead_tuples") == 0); + /* - * The checks in this function are required only for failover and - * two_phase options. + * Altering the retain_dead_tuples option does not update the slot on the + * publisher. */ - Assert(strcmp(option, "failover") == 0 || - strcmp(option, "two_phase") == 0); + Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0); /* * Do not allow changing the option if the subscription is enabled. This @@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option, * the publisher by the existing walsender, so we could have allowed that * even when the subscription is enabled. But we kept this restriction for * the sake of consistency and simplicity. + * + * Additionally, do not allow changing the retain_dead_tuples option when + * the subscription is enabled to prevent race conditions arising from the + * new option value being acknowledged asynchronously by the launcher and + * apply workers. + * + * Without the restriction, a race condition may arise when a user + * disables and immediately re-enables the retain_dead_tuples option. In + * this case, the launcher might drop the slot upon noticing the disabled + * action, while the apply worker may keep maintaining + * oldest_nonremovable_xid without noticing the option change. During this + * period, a transaction ID wraparound could falsely make this ID appear + * as if it originates from the future w.r.t the transaction ID stored in + * the slot maintained by launcher. + * + * Similarly, if the user enables retain_dead_tuples concurrently with the + * launcher starting the worker, the apply worker may start calculating + * oldest_nonremovable_xid before the launcher notices the enable action. + * Consequently, the launcher may update slot.xmin to a newer value than + * that maintained by the worker. In subsequent cycles, upon integrating + * the worker's oldest_nonremovable_xid, the launcher might detect a + * retreat in the calculated xmin, necessitating additional handling. + * + * XXX To address the above race conditions, we can define + * oldest_nonremovable_xid as FullTransactionID and adds the check to + * disallow retreating the conflict slot's xmin. For now, we kept the + * implementation simple by disallowing change to the retain_dead_tuples, + * but in the future we can change this after some more analysis. + * + * Note that we could restrict only the enabling of retain_dead_tuples to + * avoid the race conditions described above, but we maintain the + * restriction for both enable and disable operations for the sake of + * consistency. */ if (sub->enabled) ereport(ERROR, @@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool check_pub_rdt = false; + bool retain_dead_tuples; + char *origin; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub = GetSubscription(subid, false); + retain_dead_tuples = sub->retaindeadtuples; + origin = sub->origin; + /* * Don't allow non-superuser modification of a subscription with * password_required=false. @@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + { + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); + + /* + * Workers may continue running even after the + * subscription has been disabled. + * + * To prevent race conditions (as described in + * CheckAlterSubOption()), ensure that all worker + * processes have already exited before proceeding. + */ + if (logicalrep_workers_find(subid, true, true)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"), + errhint("Try again after some time."))); + + /* + * Remind the user that enabling subscription will prevent + * the accumulation of dead tuples. + */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + check_pub_rdt = opts.retaindeadtuples; + retain_dead_tuples = opts.retaindeadtuples; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); replaces[Anum_pg_subscription_suborigin - 1] = true; + + /* + * Check if changes from different origins may be received + * from the publisher when the origin is changed to ANY + * and retain_dead_tuples is enabled. + */ + check_pub_rdt = retain_dead_tuples && + pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0; + + origin = opts.origin; } update_tuple = true; @@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); + /* + * Check track_commit_timestamp only when enabling the + * subscription in case it was disabled after creation. See + * comments atop CheckSubDeadTupleRetention() for details. + */ + if (sub->retaindeadtuples) + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_dead_tuples=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + check_pub_rdt = sub->retaindeadtuples && opts.enabled; break; } @@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_dead_tuples. + */ + check_pub_rdt = sub->retaindeadtuples; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_dead_tuples. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || check_pub_rdt) { bool must_use_password; char *err; @@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + if (retain_dead_tuples) + check_pub_dead_tuple_retention(wrconn); + + check_publications_origin(wrconn, sub->publications, false, + retain_dead_tuples, origin, NULL, 0, + sub->name); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * Check and log a warning if the publisher has subscribed to the same table, * its partition ancestors (if it's a partition), or its partition children (if * it's a partitioned table), from some other publishers. This check is - * required only if "copy_data = true" and "origin = none" for CREATE - * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the - * user that data having origin might have been copied. + * required in the following scenarios: * - * This check need not be performed on the tables that are already added - * because incremental sync for those tables will happen through WAL and the - * origin of the data can be identified from the WAL records. + * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "copy_data = true" and "origin = none": + * - Warn the user that data with an origin might have been copied. + * - This check is skipped for tables already added, as incremental sync via + * WAL allows origin tracking. The list of such tables is in + * subrel_local_oids. * - * subrel_local_oids contains the list of relation oids that are already - * present on the subscriber. + * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "retain_dead_tuples = true" and "origin = any", and for ALTER + * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or + * when the publisher's status changes (e.g., due to a connection string + * update): + * - Warn the user that only conflict detection info for local changes on + * the publisher is retained. Data from other origins may lack sufficient + * details for reliable conflict detection. + * - See comments atop worker.c for more details. */ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, - bool copydata, char *origin, Oid *subrel_local_oids, + bool copydata, bool retain_dead_tuples, + char *origin, Oid *subrel_local_oids, int subrel_count, char *subname) { WalRcvExecResult *res; @@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, Oid tableRow[1] = {TEXTOID}; List *publist = NIL; int i; + bool check_rdt; + bool check_table_sync; + bool origin_none = origin && + pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0; + + /* + * Enable retain_dead_tuples checks only when origin is set to 'any', + * since with origin='none' only local changes are replicated to the + * subscriber. + */ + check_rdt = retain_dead_tuples && !origin_none; + + /* + * Enable table synchronization checks only when origin is 'none', to + * ensure that data from other origins is not inadvertently copied. + */ + check_table_sync = copydata && origin_none; - if (!copydata || !origin || - (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + /* retain_dead_tuples and table sync checks occur separately */ + Assert(!(check_rdt && check_table_sync)); + + /* Return if no checks are required */ + if (!check_rdt && !check_table_sync) return; initStringInfo(&cmd); @@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, /* * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables. + * This check should be skipped for these tables if checking for table + * sync scenario. However, when handling the retain_dead_tuples scenario, + * ensure all tables are checked, as some existing tables may now include + * changes from other origins due to newly created subscriptions on the + * publisher. */ - for (i = 0; i < subrel_count; i++) + if (check_table_sync) { - Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, * XXX: For simplicity, we don't check whether the table has any data or * not. If the table doesn't have any data then we don't need to * distinguish between data having origin and data not having origin so we - * can avoid logging a warning in that case. + * can avoid logging a warning for table sync scenario. */ if (publist) { StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); /* Prepare the list of publication(s) for warning message. */ GetPublicationsStr(publist, pubnames, false); + + if (check_table_sync) + { + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + else + { + appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"), + subname); + appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples.")); + } + ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", - subname), - errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", - "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.", + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.", list_length(publist), pubnames->data), - errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + errhint_internal("%s", err_hint->data)); } ExecDropSingleTupleTableSlot(slot); @@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, } /* + * Determine whether the retain_dead_tuples can be enabled based on the + * publisher's status. + * + * This option is disallowed if the publisher is running a version earlier + * than the PG19, or if the publisher is in recovery (i.e., it is a standby + * server). + * + * See comments atop worker.c for a detailed explanation. + */ +static void +check_pub_dead_tuple_retention(WalReceiverConn *wrconn) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (walrcv_server_version(wrconn) < 19000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_dead_tuples if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Check if the subscriber's configuration is adequate to enable the + * retain_dead_tuples option. + * + * Issue an ERROR if the wal_level does not support the use of replication + * slots when check_guc is set to true. + * + * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is + * set to true. This is only to highlight the importance of enabling + * track_commit_timestamp instead of catching all the misconfigurations, as + * this setting can be adjusted after subscription creation. Without it, the + * apply worker will simply skip conflict detection. + * + * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an + * ERROR since users can only modify retain_dead_tuples for disabled + * subscriptions. And as long as the subscription is enabled promptly, it will + * not pose issues. + */ +void +CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, + int elevel_for_sub_disabled) +{ + Assert(elevel_for_sub_disabled == NOTICE || + elevel_for_sub_disabled == WARNING); + + if (check_guc && wal_level < WAL_LEVEL_REPLICA) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), + errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); + + if (check_guc && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (sub_disabled) + ereport(elevel_for_sub_disabled, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + (elevel_for_sub_disabled > NOTICE) + ? errhint("Consider setting %s to false.", + "retain_dead_tuples") : 0); +} + +/* * Get the list of tables which belong to specified publications on the * publisher connection. * diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c index 9f4d05ffbd4..4da46666439 100644 --- a/src/backend/libpq/auth.c +++ b/src/backend/libpq/auth.c @@ -94,8 +94,16 @@ static int auth_peer(hbaPort *port); #define PGSQL_PAM_SERVICE "postgresql" /* Service name passed to PAM */ +/* Work around original Solaris' lack of "const" in the conv_proc signature */ +#ifdef _PAM_LEGACY_NONCONST +#define PG_PAM_CONST +#else +#define PG_PAM_CONST const +#endif + static int CheckPAMAuth(Port *port, const char *user, const char *password); -static int pam_passwd_conv_proc(int num_msg, const struct pam_message **msg, +static int pam_passwd_conv_proc(int num_msg, + PG_PAM_CONST struct pam_message **msg, struct pam_response **resp, void *appdata_ptr); static struct pam_conv pam_passw_conv = { @@ -1917,7 +1925,7 @@ auth_peer(hbaPort *port) */ static int -pam_passwd_conv_proc(int num_msg, const struct pam_message **msg, +pam_passwd_conv_proc(int num_msg, PG_PAM_CONST struct pam_message **msg, struct pam_response **resp, void *appdata_ptr) { const char *passwd; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0c75fe064d5..886d99951dd 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -233,7 +233,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, } PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver, - gettext_noop("received message via replication")); + "received message via replication"); /* * Set always-secure search path for the cases where the connection is diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..1fa931a7422 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -441,7 +441,8 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg)); + dsm_segment_handle(winfo->dsm_seg), + false); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..742d9ba68e9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static bool acquire_conflict_slot_if_exists(void); +static void advance_conflict_slot_xmin(TransactionId new_xmin); /* @@ -148,6 +151,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retaindeadtuples = subform->subretaindeadtuples; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid relid, dsm_handle subworker_dsm, + bool retain_dead_tuples) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker + * - The replication slot used in conflict detection is created when + * retain_dead_tuples is enabled */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + Assert(!retain_dead_tuples || MyReplicationSlot); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -454,6 +462,9 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->oldest_nonremovable_xid = retain_dead_tuples + ? MyReplicationSlot->data.xmin + : InvalidTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1118,7 +1129,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1150,6 +1164,12 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + /* + * Acquire the conflict detection slot at startup to ensure it can be + * dropped if no longer needed after a restart. + */ + acquire_conflict_slot_if_exists(); + /* Enter main loop */ for (;;) { @@ -1159,6 +1179,9 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + bool retain_dead_tuples = false; + TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1168,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_SIZES); oldctx = MemoryContextSwitchTo(subctx); - /* Start any missing workers for enabled subscriptions. */ + /* + * Start any missing workers for enabled subscriptions. + * + * Also, during the iteration through all subscriptions, we compute + * the minimum XID required to protect deleted tuples for conflict + * detection if one of the subscription enables retain_dead_tuples + * option. + */ sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1178,6 +1208,38 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + if (sub->retaindeadtuples) + { + retain_dead_tuples = true; + + /* + * Can't advance xmin of the slot unless all the subscriptions + * with retain_dead_tuples are enabled. This is required to + * ensure that we don't advance the xmin of + * CONFLICT_DETECTION_SLOT if one of the subscriptions is not + * enabled. Otherwise, we won't be able to detect conflicts + * reliably for such a subscription even though it has set the + * retain_dead_tuples option. + */ + can_advance_xmin &= sub->enabled; + + /* + * Create a replication slot to retain information necessary + * for conflict detection such as dead tuples, commit + * timestamps, and origins. + * + * The slot is created before starting the apply worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + * + * The slot is created even for a disabled subscription to + * ensure that conflict-related information is available when + * applying remote changes that occurred before the + * subscription was enabled. + */ + CreateConflictDetectionSlot(); + } + if (!sub->enabled) continue; @@ -1186,7 +1248,27 @@ ApplyLauncherMain(Datum main_arg) LWLockRelease(LogicalRepWorkerLock); if (w != NULL) - continue; /* worker is running already */ + { + /* + * Compute the minimum xmin required to protect dead tuples + * required for conflict detection among all running apply + * workers that enables retain_dead_tuples. + */ + if (sub->retaindeadtuples && can_advance_xmin) + compute_min_nonremovable_xid(w, &xmin); + + /* worker is running already */ + continue; + } + + /* + * Can't advance xmin of the slot unless all the workers + * corresponding to subscriptions with retain_dead_tuples are + * running, disabling the further computation of the minimum + * nonremovable xid. + */ + if (sub->retaindeadtuples) + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1210,7 +1292,8 @@ ApplyLauncherMain(Datum main_arg) if (!logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID)) + DSM_HANDLE_INVALID, + sub->retaindeadtuples)) { /* * We get here either if we failed to launch a worker @@ -1230,6 +1313,20 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription + * that requires us to retain dead tuples. Otherwise, if required, + * advance the slot's xmin to protect dead tuples required for the + * conflict detection. + */ + if (MyReplicationSlot) + { + if (!retain_dead_tuples) + ReplicationSlotDropAcquired(); + else if (can_advance_xmin) + advance_conflict_slot_xmin(xmin); + } + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1258,6 +1355,125 @@ ApplyLauncherMain(Datum main_arg) } /* + * Determine the minimum non-removable transaction ID across all apply workers + * for subscriptions that have retain_dead_tuples enabled. Store the result + * in *xmin. + */ +static void +compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +{ + TransactionId nonremovable_xid; + + Assert(worker != NULL); + + /* + * The replication slot for conflict detection must be created before the + * worker starts. + */ + Assert(MyReplicationSlot); + + SpinLockAcquire(&worker->relmutex); + nonremovable_xid = worker->oldest_nonremovable_xid; + SpinLockRelease(&worker->relmutex); + + Assert(TransactionIdIsValid(nonremovable_xid)); + + if (!TransactionIdIsValid(*xmin) || + TransactionIdPrecedes(nonremovable_xid, *xmin)) + *xmin = nonremovable_xid; +} + +/* + * Acquire the replication slot used to retain information for conflict + * detection, if it exists. + * + * Return true if successfully acquired, otherwise return false. + */ +static bool +acquire_conflict_slot_if_exists(void) +{ + if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + return false; + + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return true; +} + +/* + * Advance the xmin the replication slot used to retain information required + * for conflict detection. + */ +static void +advance_conflict_slot_xmin(TransactionId new_xmin) +{ + Assert(MyReplicationSlot); + Assert(TransactionIdIsValid(new_xmin)); + Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + + /* Return if the xmin value of the slot cannot be advanced */ + if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) + return; + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = new_xmin; + MyReplicationSlot->data.xmin = new_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + + /* + * Like PhysicalConfirmReceivedLocation(), do not save slot information + * each time. This is acceptable because all concurrent transactions on + * the publisher that require the data preceding the slot's xmin should + * have already been applied and flushed on the subscriber before the xmin + * is advanced. So, even if the slot's xmin regresses after a restart, it + * will be advanced again in the next cycle. Therefore, no data required + * for conflict detection will be prematurely removed. + */ + return; +} + +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + TransactionId xmin_horizon; + + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); +} + +/* * Is current process the logical replication launcher? */ bool diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7b4e8629553..5febd154b6b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4917,7 +4917,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e4fd6347fd1..3fea0a0206e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -615,7 +615,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + false); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c5fb627aa56..b59221c4d06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,96 @@ * failover = true when creating the subscription. Enabling failover allows us * to smoothly transition to the promoted standby, ensuring that we can * subscribe to the new primary without losing any data. + * + * RETAIN DEAD TUPLES + * ---------------------- + * Each apply worker that enabled retain_dead_tuples option maintains a + * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to + * prevent dead rows from being removed prematurely when the apply worker still + * needs them to detect conflicts reliably. This helps to retain the required + * commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. + * + * The logical replication launcher manages an internal replication slot named + * "pg_conflict_detection". It asynchronously aggregates the non-removable + * transaction ID from all apply workers to determine the appropriate xmin for + * the slot, thereby retaining necessary tuples. + * + * The non-removable transaction ID in the apply worker is advanced to the + * oldest running transaction ID once all concurrent transactions on the + * publisher have been applied and flushed locally. The process involves: + * + * - RDT_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RDT_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RDT_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RDT_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, detecting concurrent remote + * transactions with earlier timestamps than the DELETE is necessary, as the + * UPDATEs in remote transactions should be ignored if their timestamp is + * earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * Similarly, when the publisher has subscribed to another publisher, + * information necessary for conflict detection cannot be retained for + * changes from origins other than the publisher. This is because publisher + * lacks the information on concurrent transactions of other publishers to + * which it subscribes. As the information on concurrent transactions is + * unavailable beyond subscriber's immediate publishers, the non-removable + * transaction ID might be advanced prematurely before changes from other + * origins have been fully applied. + * + * XXX Retaining information for changes from other origins might be possible + * by requesting the subscription on that origin to enable retain_dead_tuples + * and fetching the conflict detection slot.xmin along with the publisher's + * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could + * wait for the remote slot's xmin to reach the oldest active transaction ID, + * ensuring that all transactions from other origins have been applied on the + * publisher, thereby getting the latest WAL position that includes all + * concurrent changes. However, this approach may impact performance, so it + * might not worth the effort. + * + * XXX It seems feasible to get the latest commit's WAL location from the + * publisher and wait till that is applied. However, we can't do that + * because commit timestamps can regress as a commit with a later LSN is not + * guaranteed to have a later timestamp than those with earlier LSNs. Having + * said that, even if that is possible, it won't improve performance much as + * the apply always lag and moves slowly as compared with the transactions + * on the publisher. *------------------------------------------------------------------------- */ @@ -140,6 +230,7 @@ #include <sys/stat.h> #include <unistd.h> +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -148,6 +239,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -166,12 +258,14 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -268,6 +362,78 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See comments atop worker.c for details of the transition between these + * phases. + */ +typedef enum +{ + RDT_GET_CANDIDATE_XID, + RDT_REQUEST_PUBLISHER_STATUS, + RDT_WAIT_FOR_PUBLISHER_STATUS, + RDT_WAIT_FOR_LOCAL_FLUSH +} RetainDeadTuplesPhase; + +/* + * Critical information for managing phase transitions within the + * RetainDeadTuplesPhase. + */ +typedef struct RetainDeadTuplesData +{ + RetainDeadTuplesPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + + /* + * Oldest transaction ID that was in the commit phase on the publisher. + * Use FullTransactionId to prevent issues with transaction ID wraparound, + * where a new remote_oldestxid could falsely appear to originate from the + * past and block advancement. + */ + FullTransactionId remote_oldestxid; + + /* + * Next transaction ID to be assigned on the publisher. Use + * FullTransactionId for consistency and to allow straightforward + * comparisons with remote_oldestxid. + */ + FullTransactionId remote_nextxid; + + TimestampTz reply_time; /* when the publisher responds with status */ + + /* + * Publisher transaction ID that must be awaited to complete before + * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use + * FullTransactionId for the same reason as remote_nextxid. + */ + FullTransactionId remote_wait_for; + + TransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainDeadTuplesData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We record + * and use this information both while sending feedback to the server and + * advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -372,6 +545,19 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received); +static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data); +static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received); +static void get_candidate_xid(RetainDeadTuplesData *rdt_data); +static void request_publisher_status(RetainDeadTuplesData *rdt_data); +static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received); +static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainDeadTuplesData rdt_data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + rdt_data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&rdt_data, false); } else if (c == 'k') { @@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&rdt_data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + rdt_data.remote_lsn = pq_getmsgint64(&s); + rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(rdt_data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&rdt_data, true); + + UpdateWorkerStats(last_received, rdt_data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + rdt_data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&rdt_data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to advance the non-removable + * transaction ID. + */ + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&rdt_data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) } /* + * Attempt to advance the non-removable transaction ID. + * + * See comments atop worker.c for details. + */ +static void +maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + if (!can_advance_nonremovable_xid(rdt_data)) + return; + + process_rdt_phase_transition(rdt_data, status_received); +} + +/* + * Preliminary check to determine if advancing the non-removable transaction ID + * is allowed. + */ +static bool +can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect conflicts reliably even + * for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return false; + + /* No need to advance if retaining dead tuples is not required */ + if (!MySubscription->retaindeadtuples) + return false; + + return true; +} + +/* + * Process phase transitions during the non-removable transaction ID + * advancement. See comments atop worker.c for details of the transition. + */ +static void +process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + switch (rdt_data->phase) + { + case RDT_GET_CANDIDATE_XID: + get_candidate_xid(rdt_data); + break; + case RDT_REQUEST_PUBLISHER_STATUS: + request_publisher_status(rdt_data); + break; + case RDT_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(rdt_data, status_received); + break; + case RDT_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(rdt_data); + break; + } +} + +/* + * Workhorse for the RDT_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainDeadTuplesData *rdt_data) +{ + TransactionId oldest_running_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + rdt_data->xid_advance_interval)) + return; + + /* + * Immediately update the timer, even if the function returns later + * without setting candidate_xid due to inactivity on the subscriber. This + * avoids frequent calls to GetOldestActiveTransactionId. + */ + rdt_data->candidate_xid_time = now; + + /* + * Consider transactions in the current database, as only dead tuples from + * this database are required for conflict detection. + */ + oldest_running_xid = GetOldestActiveTransactionId(false, false); + + /* + * Oldest active transaction ID (oldest_running_xid) can't be behind any + * of its previously computed value. + */ + Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)) + { + adjust_xid_advance_interval(rdt_data, false); + return; + } + + adjust_xid_advance_interval(rdt_data, true); + + rdt_data->candidate_xid = oldest_running_xid; + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainDeadTuplesData *rdt_data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) + rdt_data->remote_wait_for = rdt_data->remote_nextxid; + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + * + * It's possible that transactions in the commit phase during the last + * cycle have now finished committing, but remote_oldestxid remains older + * than remote_wait_for. This can happen if some old transaction came in + * the commit phase when we requested status in this cycle. We do not + * handle this case explicitly as it's rare and the benefit doesn't + * justify the required complexity. Tracking would require either caching + * all xids at the publisher or sending them to subscribers. The condition + * will resolve naturally once the remaining transactions are finished. + * + * Directly advancing the non-removable transaction ID is possible if + * there are no activities on the publisher since the last advancement + * cycle. However, it requires maintaining two fields, last_remote_nextxid + * and last_remote_lsn, within the structure for comparison with the + * current cycle's values. Considering the minimal cost of continuing in + * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to + * advance the transaction ID here. + */ + if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for, + rdt_data->remote_oldestxid)) + rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH; + else + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainDeadTuplesData *rdt_data) +{ + Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) && + TransactionIdIsValid(rdt_data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect conflicts reliably. This check primarily addresses + * scenarios where the publisher's clock falls behind; if the publisher's + * clock is ahead, subsequent transactions will naturally bear later + * commit timestamps, conforming to the design outlined atop worker.c. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(rdt_data->reply_time, + rdt_data->candidate_xid_time, 0)) + ereport(ERROR, + errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"), + errdetail_internal("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the rdt_data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costly operations in get_flush_position() too frequently + * during change application. + */ + if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time && + TimestampDifferenceExceeds(rdt_data->flushpos_update_time, + rdt_data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + rdt_data->flushpos_update_time = rdt_data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < rdt_data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + rdt_data->candidate_xid); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. We can even use + * flushpos_update_time in the next round to decide whether to get the + * latest flush position. + */ + rdt_data->phase = RDT_GET_CANDIDATE_XID; + rdt_data->remote_lsn = InvalidXLogRecPtr; + rdt_data->remote_oldestxid = InvalidFullTransactionId; + rdt_data->remote_nextxid = InvalidFullTransactionId; + rdt_data->reply_time = 0; + rdt_data->remote_wait_for = InvalidFullTransactionId; + rdt_data->candidate_xid = InvalidTransactionId; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) +{ + if (!new_xid_found && rdt_data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + +/* * Exit routine for apply workers due to subscription parameter changes. */ static void @@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void) apply_worker_exit(); } + /* + * Restart the worker if retain_dead_tuples was enabled during startup. + * + * At this point, the replication slot used for conflict detection might + * not exist yet, or could be dropped soon if the launcher perceives + * retain_dead_tuples as disabled. To avoid unnecessary tracking of + * oldest_nonremovable_xid when the slot is absent or at risk of being + * dropped, a restart is initiated. + * + * The oldest_nonremovable_xid should be initialized only when the + * retain_dead_tuples is enabled before launching the worker. See + * logicalrep_worker_launch. + */ + if (am_leader_apply_worker() && + MySubscription->retaindeadtuples && + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", + MySubscription->name, "retain_dead_tuples")); + + apply_worker_exit(); + } + /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Skip the track_commit_timestamp check when disabling the worker due to + * an error, as verifying commit timestamps is unnecessary in this + * context. + */ + if (MySubscription->retaindeadtuples) + CheckSubDeadTupleRetention(false, true, WARNING); + proc_exit(0); } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e44ad576bc7..8605776ad86 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsSlotForConflictCheck(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsSlotForConflictCheck(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name), + errdetail("The name \"%s\" is reserved for the conflict detection slot.", + CONFLICT_DETECTION_SLOT)); + + return false; + } + return true; } /* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsSlotForConflictCheck(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + +/* * Create a new replication slot and mark it as used by this backend. * * name: Name of the slot @@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher or pg_upgrade may create or migrate an internal + * slot, so using a reserved name is allowed in these cases. + */ + ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(), + ERROR); if (failover) { @@ -582,6 +615,17 @@ retry: } /* + * Do not allow users to acquire the reserved slot. This scenario may + * occur if the launcher that owns the slot has terminated unexpectedly + * due to an error, and a backend process attempts to reuse the slot. + */ + if (!IsLogicalLauncher() && IsSlotForConflictCheck(name)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("cannot acquire replication slot \"%s\"", name), + errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher.")); + + /* * This is the slot we want; check if it's active under some other * process. In single user mode, we don't need this check. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 28b8591efa5..ee911394a23 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -65,6 +65,7 @@ #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" +#include "libpq/protocol.h" #include "miscadmin.h" #include "nodes/replnodes.h" #include "pgstat.h" @@ -84,6 +85,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: maxmsglen = PQ_LARGE_MESSAGE_LIMIT; break; - case 'c': /* CopyDone */ - case 'f': /* CopyFail */ - case 'H': /* Flush */ - case 'S': /* Sync */ + case PqMsg_CopyDone: + case PqMsg_CopyFail: + case PqMsg_Flush: + case PqMsg_Sync: maxmsglen = PQ_SMALL_MESSAGE_LIMIT; break; default: @@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, /* Process the message */ switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: AppendIncrementalManifestData(ib, buf->data, buf->len); return true; - case 'c': /* CopyDone */ + case PqMsg_CopyDone: return false; - case 'H': /* Sync */ - case 'S': /* Flush */ + case PqMsg_Sync: + case PqMsg_Flush: /* Ignore these while in CopyOut mode as we do elsewhere. */ return true; - case 'f': + case PqMsg_CopyFail: ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", @@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, tmpbuf.data, sizeof(int64)); /* output previously gathered data in a CopyData packet */ - pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); CHECK_FOR_INTERRUPTS(); @@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void) case PqMsg_CopyDone: if (!streamingDoneSending) { - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; } @@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'p': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void) } /* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestXidInCommit; + FullTransactionId nextFullXid; + FullTransactionId fullOldestXidInCommit; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + /* + * This shouldn't happen because we don't support getting primary status + * message from standby. + */ + if (RecoveryInProgress()) + elog(ERROR, "the primary status is unavailable during recovery"); + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + /* + * Consider transactions in the current database, as only these are the + * ones replicated. + */ + oldestXidInCommit = GetOldestActiveTransactionId(true, false); + nextFullXid = ReadNextFullTransactionId(); + fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid, + oldestXidInCommit); + lsn = GetXLogWriteRecPtr(); + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit)); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); +} + +/* * Compute how long send/receive loops should sleep. * * If wal_sender_timeout is enabled we want to wake up in time to send @@ -3246,7 +3307,7 @@ XLogSendPhysical(void) wal_segment_close(xlogreader); /* Send CopyDone */ - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; WalSndCaughtUp = true; @@ -3374,7 +3435,7 @@ retry: memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); sentPtr = endptr; @@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) pq_sendbyte(&output_message, requestReply ? 1 : 0); /* ... and send it wrapped in CopyData */ - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); /* Set local flag */ if (requestReply) diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md index f10b5c7e31e..72ae3b3737d 100644 --- a/src/backend/storage/aio/README.md +++ b/src/backend/storage/aio/README.md @@ -94,7 +94,7 @@ pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV, 0); * * In this example we're reading only a single buffer, hence the 1. */ -pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); +pgaio_io_set_handle_data_32(ioh, (uint32 *) &buffer, 1); /* * Pass the AIO handle to lower-level function. When operating on the level of @@ -119,8 +119,9 @@ pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); * e.g. due to reaching a limit on the number of unsubmitted IOs, and even * complete before smgrstartreadv() returns. */ +void *page = BufferGetBlock(buffer); smgrstartreadv(ioh, operation->smgr, forknum, blkno, - BufferGetBlock(buffer), 1); + &page, 1); /* * To benefit from AIO, it is beneficial to perform other work, including diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 2418967def6..bf987aed8d3 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2814,8 +2814,10 @@ GetRunningTransactionData(void) * * Similar to GetSnapshotData but returns just oldestActiveXid. We include * all PGPROCs with an assigned TransactionId, even VACUUM processes. - * We look at all databases, though there is no need to include WALSender - * since this has no effect on hot standby conflicts. + * + * If allDbs is true, we look at all databases, though there is no need to + * include WALSender since this has no effect on hot standby conflicts. If + * allDbs is false, skip processes attached to other databases. * * This is never executed during recovery so there is no need to look at * KnownAssignedXids. @@ -2823,9 +2825,12 @@ GetRunningTransactionData(void) * We don't worry about updating other counters, we want to keep this as * simple as possible and leave GetSnapshotData() as the primary code for * that bookkeeping. + * + * inCommitOnly indicates getting the oldestActiveXid among the transactions + * in the commit critical section. */ TransactionId -GetOldestActiveTransactionId(void) +GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs) { ProcArrayStruct *arrayP = procArray; TransactionId *other_xids = ProcGlobal->xids; @@ -2852,6 +2857,8 @@ GetOldestActiveTransactionId(void) for (index = 0; index < arrayP->numProcs; index++) { TransactionId xid; + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; /* Fetch xid just once - see GetNewTransactionId */ xid = UINT32_ACCESS_ONCE(other_xids[index]); @@ -2859,6 +2866,13 @@ GetOldestActiveTransactionId(void) if (!TransactionIdIsNormal(xid)) continue; + if (inCommitOnly && + (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0) + continue; + + if (!allDbs && proc->databaseId != MyDatabaseId) + continue; + if (TransactionIdPrecedes(xid, oldestRunningXid)) oldestRunningXid = xid; diff --git a/src/backend/storage/lmgr/generate-lwlocknames.pl b/src/backend/storage/lmgr/generate-lwlocknames.pl index c7a6720440d..cd3e43c448a 100644 --- a/src/backend/storage/lmgr/generate-lwlocknames.pl +++ b/src/backend/storage/lmgr/generate-lwlocknames.pl @@ -27,18 +27,24 @@ print $h "/* there is deliberately not an #ifndef LWLOCKNAMES_H here */\n\n"; # -# First, record the predefined LWLocks listed in wait_event_names.txt. We'll -# cross-check those with the ones in lwlocklist.h. +# First, record the predefined LWLocks and built-in tranches listed in +# wait_event_names.txt. We'll cross-check those with the ones in lwlocklist.h. # +my @wait_event_tranches; my @wait_event_lwlocks; my $record_lwlocks = 0; +my $in_tranches = 0; while (<$wait_event_names>) { chomp; # Check for end marker. - last if /^# END OF PREDEFINED LWLOCKS/; + if (/^# END OF PREDEFINED LWLOCKS/) + { + $in_tranches = 1; + next; + } # Skip comments and empty lines. next if /^#/; @@ -54,13 +60,29 @@ while (<$wait_event_names>) # Go to the next line if we are not yet recording LWLocks. next if not $record_lwlocks; + # Stop recording if we reach another section. + last if /^Section:/; + # Record the LWLock. (my $waiteventname, my $waitevendocsentence) = split(/\t/, $_); - push(@wait_event_lwlocks, $waiteventname); + + if ($in_tranches) + { + push(@wait_event_tranches, $waiteventname); + } + else + { + push(@wait_event_lwlocks, $waiteventname); + } } +# +# While gathering the list of predefined LWLocks, cross-check the lists in +# lwlocklist.h with the wait events we just recorded. +# my $in_comment = 0; -my $i = 0; +my $lwlock_count = 0; +my $tranche_count = 0; while (<$lwlocklist>) { chomp; @@ -81,38 +103,72 @@ while (<$lwlocklist>) next; } - die "unable to parse lwlocklist.h line \"$_\"" - unless /^PG_LWLOCK\((\d+),\s+(\w+)\)$/; + # + # Gather list of predefined LWLocks and cross-check with the wait events. + # + if (/^PG_LWLOCK\((\d+),\s+(\w+)\)$/) + { + my ($lockidx, $lockname) = ($1, $2); - (my $lockidx, my $lockname) = ($1, $2); + die "lwlocklist.h not in order" if $lockidx < $lastlockidx; + die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; - die "lwlocklist.h not in order" if $lockidx < $lastlockidx; - die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; + die "$lockname defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $lwlock_count >= scalar @wait_event_lwlocks; + die "lists of predefined LWLocks do not match (first mismatch at " + . "$wait_event_lwlocks[$lwlock_count] in wait_event_names.txt and " + . "$lockname in lwlocklist.h)" + if $wait_event_lwlocks[$lwlock_count] ne $lockname; - die "$lockname defined in lwlocklist.h but missing from " - . "wait_event_names.txt" - if $i >= scalar @wait_event_lwlocks; - die "lists of predefined LWLocks do not match (first mismatch at " - . "$wait_event_lwlocks[$i] in wait_event_names.txt and $lockname in " - . "lwlocklist.h)" - if $wait_event_lwlocks[$i] ne $lockname; - $i++; + $lwlock_count++; - while ($lastlockidx < $lockidx - 1) + while ($lastlockidx < $lockidx - 1) + { + ++$lastlockidx; + } + $lastlockidx = $lockidx; + + # Add a "Lock" suffix to each lock name, as the C code depends on that. + printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", + $lockname . "Lock"; + + next; + } + + # + # Cross-check the built-in LWLock tranches with the wait events. + # + if (/^PG_LWLOCKTRANCHE\((\w+),\s+(\w+)\)$/) { - ++$lastlockidx; + my ($tranche_id, $tranche_name) = ($1, $2); + + die "$tranche_name defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $tranche_count >= scalar @wait_event_tranches; + die + "lists of built-in LWLock tranches do not match (first mismatch at " + . "$wait_event_tranches[$tranche_count] in wait_event_names.txt and " + . "$tranche_name in lwlocklist.h)" + if $wait_event_tranches[$tranche_count] ne $tranche_name; + + $tranche_count++; + + next; } - $lastlockidx = $lockidx; - # Add a "Lock" suffix to each lock name, as the C code depends on that - printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", - $lockname . "Lock"; + die "unable to parse lwlocklist.h line \"$_\""; } die - "$wait_event_lwlocks[$i] defined in wait_event_names.txt but missing from " - . "lwlocklist.h" - if $i < scalar @wait_event_lwlocks; + "$wait_event_lwlocks[$lwlock_count] defined in wait_event_names.txt but " + . " missing from lwlocklist.h" + if $lwlock_count < scalar @wait_event_lwlocks; + +die + "$wait_event_tranches[$tranche_count] defined in wait_event_names.txt but " + . "missing from lwlocklist.h" + if $tranche_count < scalar @wait_event_tranches; print $h "\n"; printf $h "#define NUM_INDIVIDUAL_LWLOCKS %s\n", $lastlockidx + 1; diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 2d43bf2cc13..ec9c345ffdf 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -122,9 +122,8 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, * own tranche. We absorb the names of these tranches from there into * BuiltinTrancheNames here. * - * 2. There are some predefined tranches for built-in groups of locks. - * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names - * appear in BuiltinTrancheNames[] below. + * 2. There are some predefined tranches for built-in groups of locks defined + * in lwlocklist.h. We absorb the names of these tranches, too. * * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche * or LWLockRegisterTranche. The names of these that are known in the current @@ -135,49 +134,10 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, */ static const char *const BuiltinTrancheNames[] = { #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname), +#define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname), #include "storage/lwlocklist.h" #undef PG_LWLOCK - [LWTRANCHE_XACT_BUFFER] = "XactBuffer", - [LWTRANCHE_COMMITTS_BUFFER] = "CommitTsBuffer", - [LWTRANCHE_SUBTRANS_BUFFER] = "SubtransBuffer", - [LWTRANCHE_MULTIXACTOFFSET_BUFFER] = "MultiXactOffsetBuffer", - [LWTRANCHE_MULTIXACTMEMBER_BUFFER] = "MultiXactMemberBuffer", - [LWTRANCHE_NOTIFY_BUFFER] = "NotifyBuffer", - [LWTRANCHE_SERIAL_BUFFER] = "SerialBuffer", - [LWTRANCHE_WAL_INSERT] = "WALInsert", - [LWTRANCHE_BUFFER_CONTENT] = "BufferContent", - [LWTRANCHE_REPLICATION_ORIGIN_STATE] = "ReplicationOriginState", - [LWTRANCHE_REPLICATION_SLOT_IO] = "ReplicationSlotIO", - [LWTRANCHE_LOCK_FASTPATH] = "LockFastPath", - [LWTRANCHE_BUFFER_MAPPING] = "BufferMapping", - [LWTRANCHE_LOCK_MANAGER] = "LockManager", - [LWTRANCHE_PREDICATE_LOCK_MANAGER] = "PredicateLockManager", - [LWTRANCHE_PARALLEL_HASH_JOIN] = "ParallelHashJoin", - [LWTRANCHE_PARALLEL_BTREE_SCAN] = "ParallelBtreeScan", - [LWTRANCHE_PARALLEL_QUERY_DSA] = "ParallelQueryDSA", - [LWTRANCHE_PER_SESSION_DSA] = "PerSessionDSA", - [LWTRANCHE_PER_SESSION_RECORD_TYPE] = "PerSessionRecordType", - [LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod", - [LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore", - [LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap", - [LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend", - [LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList", - [LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA", - [LWTRANCHE_PGSTATS_HASH] = "PgStatsHash", - [LWTRANCHE_PGSTATS_DATA] = "PgStatsData", - [LWTRANCHE_LAUNCHER_DSA] = "LogicalRepLauncherDSA", - [LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash", - [LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA", - [LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash", - [LWTRANCHE_COMMITTS_SLRU] = "CommitTsSLRU", - [LWTRANCHE_MULTIXACTOFFSET_SLRU] = "MultiXactOffsetSLRU", - [LWTRANCHE_MULTIXACTMEMBER_SLRU] = "MultiXactMemberSLRU", - [LWTRANCHE_NOTIFY_SLRU] = "NotifySLRU", - [LWTRANCHE_SERIAL_SLRU] = "SerialSLRU", - [LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU", - [LWTRANCHE_XACT_SLRU] = "XactSLRU", - [LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA", - [LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion", +#undef PG_LWLOCKTRANCHE }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4da68312b5f..0be307d2ca0 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -356,9 +356,13 @@ AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) # -# Predefined LWLocks (i.e., those declared in lwlocknames.h) must be listed -# in the section above and must be listed in the same order as in -# lwlocknames.h. Other LWLocks must be listed in the section below. +# Predefined LWLocks (i.e., those declared at the top of lwlocknames.h) must be +# listed in the section above and must be listed in the same order as in +# lwlocknames.h. +# +# Likewise, the built-in LWLock tranches (i.e., those declared at the bottom of +# lwlocknames.h) must be listed in the section below and must be listed in the +# same order as in lwlocknames.h. # XactBuffer "Waiting for I/O on a transaction status SLRU buffer." diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index d44f8c262ba..a4f8b4faa90 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -21,6 +21,7 @@ #include "commands/extension.h" #include "miscadmin.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" @@ -410,3 +411,21 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * binary_upgrade_create_conflict_detection_slot + * + * Create a replication slot to retain information necessary for conflict + * detection such as dead tuples, commit timestamps, and origins. + */ +Datum +binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS) +{ + CHECK_IS_BINARY_UPGRADE; + + CreateConflictDetectionSlot(); + + ReplicationSlotRelease(); + + PG_RETURN_VOID(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ede10e5291e..6298edb26b5 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5028,6 +5028,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subretaindeadtuples; int i, ntups; @@ -5100,10 +5101,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBufferStr(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.subretaindeadtuples\n"); + else + appendPQExpBufferStr(query, + " false AS subretaindeadtuples\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5137,6 +5145,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5170,6 +5179,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); subinfo[i].subfailover = (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); + subinfo[i].subretaindeadtuples = + (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5428,6 +5439,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); + if (subinfo->subretaindeadtuples) + appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 2370c98d192..93a4475d51b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -711,6 +711,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + bool subretaindeadtuples; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 30579ef2051..5e6403f0773 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -28,7 +28,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster); static void check_for_new_tablespace_dir(void); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_for_unicode_update(ClusterInfo *cluster); -static void check_new_cluster_logical_replication_slots(void); +static void check_new_cluster_replication_slots(void); static void check_new_cluster_subscription_configuration(void); static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); @@ -631,7 +631,7 @@ check_and_dump_old_cluster(void) * Before that the logical slots are not upgraded, so we will not be * able to upgrade the logical replication clusters completely. */ - get_subscription_count(&old_cluster); + get_subscription_info(&old_cluster); check_old_cluster_subscription_state(); } @@ -764,7 +764,7 @@ check_new_cluster(void) check_for_new_tablespace_dir(); - check_new_cluster_logical_replication_slots(); + check_new_cluster_replication_slots(); check_new_cluster_subscription_configuration(); } @@ -2040,48 +2040,80 @@ check_for_unicode_update(ClusterInfo *cluster) } /* - * check_new_cluster_logical_replication_slots() + * check_new_cluster_replication_slots() * - * Verify that there are no logical replication slots on the new cluster and - * that the parameter settings necessary for creating slots are sufficient. + * Validate the new cluster's readiness for migrating replication slots: + * - Ensures no existing logical replication slots on the new cluster when + * migrating logical slots. + * - Ensure conflict detection slot does not exist on the new cluster when + * migrating subscriptions with retain_dead_tuples enabled. + * - Ensure that the parameter settings on the new cluster necessary for + * creating slots are sufficient. */ static void -check_new_cluster_logical_replication_slots(void) +check_new_cluster_replication_slots(void) { PGresult *res; PGconn *conn; int nslots_on_old; int nslots_on_new; + int rdt_slot_on_new; int max_replication_slots; char *wal_level; + int i_nslots_on_new; + int i_rdt_slot_on_new; - /* Logical slots can be migrated since PG17. */ + /* + * Logical slots can be migrated since PG17 and a physical slot + * CONFLICT_DETECTION_SLOT can be migrated since PG19. + */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) return; nslots_on_old = count_old_cluster_logical_slots(); - /* Quick return if there are no logical slots to be migrated. */ - if (nslots_on_old == 0) + /* + * Quick return if there are no slots to be migrated and no subscriptions + * have the retain_dead_tuples option enabled. + */ + if (nslots_on_old == 0 && !old_cluster.sub_retain_dead_tuples) return; conn = connectToServer(&new_cluster, "template1"); - prep_status("Checking for new cluster logical replication slots"); + prep_status("Checking for new cluster replication slots"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "temporary IS FALSE;"); + res = executeQueryOrDie(conn, "SELECT %s AS nslots_on_new, %s AS rdt_slot_on_new " + "FROM pg_catalog.pg_replication_slots", + nslots_on_old > 0 + ? "COUNT(*) FILTER (WHERE slot_type = 'logical' AND temporary IS FALSE)" + : "0", + old_cluster.sub_retain_dead_tuples + ? "COUNT(*) FILTER (WHERE slot_name = 'pg_conflict_detection')" + : "0"); if (PQntuples(res) != 1) - pg_fatal("could not count the number of logical replication slots"); + pg_fatal("could not count the number of replication slots"); - nslots_on_new = atoi(PQgetvalue(res, 0, 0)); + i_nslots_on_new = PQfnumber(res, "nslots_on_new"); + i_rdt_slot_on_new = PQfnumber(res, "rdt_slot_on_new"); + + nslots_on_new = atoi(PQgetvalue(res, 0, i_nslots_on_new)); if (nslots_on_new) + { + Assert(nslots_on_old); pg_fatal("expected 0 logical replication slots but found %d", nslots_on_new); + } + + rdt_slot_on_new = atoi(PQgetvalue(res, 0, i_rdt_slot_on_new)); + + if (rdt_slot_on_new) + { + Assert(old_cluster.sub_retain_dead_tuples); + pg_fatal("The replication slot \"pg_conflict_detection\" already exists on the new cluster"); + } PQclear(res); @@ -2094,12 +2126,24 @@ check_new_cluster_logical_replication_slots(void) wal_level = PQgetvalue(res, 0, 0); - if (strcmp(wal_level, "logical") != 0) + if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0) pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"", wal_level); + if (old_cluster.sub_retain_dead_tuples && + strcmp(wal_level, "minimal") == 0) + pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"", + wal_level); + max_replication_slots = atoi(PQgetvalue(res, 1, 0)); + if (old_cluster.sub_retain_dead_tuples && + nslots_on_old + 1 > max_replication_slots) + pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " + "logical replication slots on the old cluster plus one additional slot required " + "for retaining conflict detection information (%d)", + max_replication_slots, nslots_on_old + 1); + if (nslots_on_old > max_replication_slots) pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " "logical replication slots (%d) on the old cluster", @@ -2211,6 +2255,22 @@ check_old_cluster_for_valid_slots(void) "The slot \"%s\" has not consumed the WAL yet\n", slot->slotname); } + + /* + * The name "pg_conflict_detection" (defined as + * CONFLICT_DETECTION_SLOT) has been reserved for logical + * replication conflict detection slot since PG19. + */ + if (strcmp(slot->slotname, "pg_conflict_detection") == 0) + { + if (script == NULL && + (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + fprintf(script, + "The slot name \"%s\" is reserved\n", + slot->slotname); + } } } diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4b7a56f5b3b..a437067cdca 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -752,20 +752,33 @@ count_old_cluster_logical_slots(void) } /* - * get_subscription_count() + * get_subscription_info() * - * Gets the number of subscriptions in the cluster. + * Gets the information of subscriptions in the cluster. */ void -get_subscription_count(ClusterInfo *cluster) +get_subscription_info(ClusterInfo *cluster) { PGconn *conn; PGresult *res; + int i_nsub; + int i_retain_dead_tuples; conn = connectToServer(cluster, "template1"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription"); - cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); + if (GET_MAJOR_VERSION(cluster->major_version) >= 1900) + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "COUNT(CASE WHEN subretaindeadtuples THEN 1 END) > 0 AS retain_dead_tuples " + "FROM pg_catalog.pg_subscription"); + else + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "'f' AS retain_dead_tuples " + "FROM pg_catalog.pg_subscription"); + + i_nsub = PQfnumber(res, "nsub"); + i_retain_dead_tuples = PQfnumber(res, "retain_dead_tuples"); + + cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub)); + cluster->sub_retain_dead_tuples = (strcmp(PQgetvalue(res, 0, i_retain_dead_tuples), "t") == 0); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 536e49d2616..d5cd5bf0b3a 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -67,6 +67,7 @@ static void set_frozenxids(bool minmxid_only); static void make_outputdirs(char *pgdata); static void setup(char *argv0); static void create_logical_replication_slots(void); +static void create_conflict_detection_slot(void); ClusterInfo old_cluster, new_cluster; @@ -88,6 +89,7 @@ int main(int argc, char **argv) { char *deletion_script_file_name = NULL; + bool migrate_logical_slots; /* * pg_upgrade doesn't currently use common/logging.c, but initialize it @@ -198,18 +200,39 @@ main(int argc, char **argv) new_cluster.pgdata); check_ok(); + migrate_logical_slots = count_old_cluster_logical_slots(); + /* - * Migrate the logical slots to the new cluster. Note that we need to do - * this after resetting WAL because otherwise the required WAL would be - * removed and slots would become unusable. There is a possibility that - * background processes might generate some WAL before we could create the - * slots in the new cluster but we can ignore that WAL as that won't be - * required downstream. + * Migrate replication slots to the new cluster. + * + * Note that we must migrate logical slots after resetting WAL because + * otherwise the required WAL would be removed and slots would become + * unusable. There is a possibility that background processes might + * generate some WAL before we could create the slots in the new cluster + * but we can ignore that WAL as that won't be required downstream. + * + * The conflict detection slot is not affected by concerns related to WALs + * as it only retains the dead tuples. It is created here for consistency. + * Note that the new conflict detection slot uses the latest transaction + * ID as xmin, so it cannot protect dead tuples that existed before the + * upgrade. Additionally, commit timestamps and origin data are not + * preserved during the upgrade. So, even after creating the slot, the + * upgraded subscriber may be unable to detect conflicts or log relevant + * commit timestamps and origins when applying changes from the publisher + * occurred before the upgrade especially if those changes were not + * replicated. It can only protect tuples that might be deleted after the + * new cluster starts. */ - if (count_old_cluster_logical_slots()) + if (migrate_logical_slots || old_cluster.sub_retain_dead_tuples) { start_postmaster(&new_cluster, true); - create_logical_replication_slots(); + + if (migrate_logical_slots) + create_logical_replication_slots(); + + if (old_cluster.sub_retain_dead_tuples) + create_conflict_detection_slot(); + stop_postmaster(false); } @@ -1025,3 +1048,24 @@ create_logical_replication_slots(void) return; } + +/* + * create_conflict_detection_slot() + * + * Create a replication slot to retain information necessary for conflict + * detection such as dead tuples, commit timestamps, and origins, for migrated + * subscriptions with retain_dead_tuples enabled. + */ +static void +create_conflict_detection_slot(void) +{ + PGconn *conn_new_template1; + + prep_status("Creating the replication conflict detection slot"); + + conn_new_template1 = connectToServer(&new_cluster, "template1"); + PQclear(executeQueryOrDie(conn_new_template1, "SELECT pg_catalog.binary_upgrade_create_conflict_detection_slot()")); + PQfinish(conn_new_template1); + + check_ok(); +} diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 69c965bb7d0..e9401430e69 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -302,6 +302,8 @@ typedef struct uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + bool sub_retain_dead_tuples; /* whether a subscription enables + * retain_dead_tuples. */ } ClusterInfo; @@ -441,7 +443,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); -void get_subscription_count(ClusterInfo *cluster); +void get_subscription_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index e46f02c6cc6..77387be0f9d 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -22,13 +22,13 @@ $publisher->start; # Initialize the old subscriber node my $old_sub = PostgreSQL::Test::Cluster->new('old_sub'); -$old_sub->init; +$old_sub->init(allows_streaming => 'physical'); $old_sub->start; my $oldbindir = $old_sub->config_data('--bindir'); # Initialize the new subscriber my $new_sub = PostgreSQL::Test::Cluster->new('new_sub'); -$new_sub->init; +$new_sub->init(allows_streaming => 'physical'); my $newbindir = $new_sub->config_data('--bindir'); # In a VPATH build, we'll be started in the source directory, but we want @@ -90,6 +90,54 @@ $old_sub->start; $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); # ------------------------------------------------------ +# Check that pg_upgrade fails when max_replication_slots configured in the new +# cluster is less than the number of logical slots in the old cluster + 1 when +# subscription's retain_dead_tuples option is enabled. +# ------------------------------------------------------ +# It is sufficient to use disabled subscription to test upgrade failure. + +$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); +$old_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_dead_tuples = true)" +); + +$old_sub->stop; + +$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 0'); + +# pg_upgrade will fail because the new cluster has insufficient +# max_replication_slots. +command_checks_all( + [ + 'pg_upgrade', + '--no-sync', + '--old-datadir' => $old_sub->data_dir, + '--new-datadir' => $new_sub->data_dir, + '--old-bindir' => $oldbindir, + '--new-bindir' => $newbindir, + '--socketdir' => $new_sub->host, + '--old-port' => $old_sub->port, + '--new-port' => $new_sub->port, + $mode, + '--check', + ], + 1, + [ + qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/ + ], + [qr//], + 'run of pg_upgrade where the new cluster has insufficient max_replication_slots' +); + +# Reset max_replication_slots +$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 10'); + +# Cleanup +$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); +$old_sub->start; +$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + +# ------------------------------------------------------ # Check that pg_upgrade refuses to run if: # a) there's a subscription with tables in a state other than 'r' (ready) or # 'i' (init) and/or @@ -200,8 +248,9 @@ $old_sub->safe_psql( rmtree($new_sub->data_dir . "/pg_upgrade_output.d"); # Verify that the upgrade should be successful with tables in 'ready'/'init' -# state along with retaining the replication origin's remote lsn, subscription's -# running status, and failover option. +# state along with retaining the replication origin's remote lsn, +# subscription's running status, failover option, and retain_dead_tuples +# option. $publisher->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); @@ -211,7 +260,7 @@ $publisher->safe_psql( $old_sub->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); - CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true); + CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true, retain_dead_tuples = true); ]); # Wait till the table tab_upgraded1 reaches 'ready' state @@ -270,7 +319,8 @@ $new_sub->append_conf('postgresql.conf', # Check that pg_upgrade is successful when all tables are in ready or in # init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is # in init state) along with retaining the replication origin's remote lsn, -# subscription's running status, and failover option. +# subscription's running status, failover option, and retain_dead_tuples +# option. # ------------------------------------------------------ command_ok( [ @@ -293,7 +343,8 @@ ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d", # ------------------------------------------------------ # Check that the data inserted to the publisher when the new subscriber is down # will be replicated once it is started. Also check that the old subscription -# states and relations origins are all preserved. +# states and relations origins are all preserved, and that the conflict +# detection slot is created. # ------------------------------------------------------ $publisher->safe_psql( 'postgres', qq[ @@ -303,15 +354,16 @@ $publisher->safe_psql( $new_sub->start; -# The subscription's running status and failover option should be preserved -# in the upgraded instance. So regress_sub4 should still have subenabled and -# subfailover set to true, while regress_sub5 should have both set to false. +# The subscription's running status, failover option, and retain_dead_tuples +# option should be preserved in the upgraded instance. So regress_sub4 should +# still have subenabled, subfailover, and subretaindeadtuples set to true, +# while regress_sub5 should have both set to false. $result = $new_sub->safe_psql('postgres', - "SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname" + "SELECT subname, subenabled, subfailover, subretaindeadtuples FROM pg_subscription ORDER BY subname" ); -is( $result, qq(regress_sub4|t|t -regress_sub5|f|f), - "check that the subscription's running status and failover are preserved" +is( $result, qq(regress_sub4|t|t|t +regress_sub5|f|f|f), + "check that the subscription's running status, failover, and retain_dead_tuples are preserved" ); # Subscription relations should be preserved @@ -330,6 +382,11 @@ $result = $new_sub->safe_psql('postgres', ); is($result, qq($remote_lsn), "remote_lsn should have been preserved"); +# The conflict detection slot should be created +$result = $new_sub->safe_psql('postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"); +is($result, qq(t), "conflict detection slot exists"); + # Resume the initial sync and wait until all tables of subscription # 'regress_sub5' are synchronized $new_sub->append_conf('postgresql.conf', diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index dd25d2fe7b8..7a06af48842 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6814,6 +6814,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + ", subretaindeadtuples AS \"%s\"\n", + gettext_noop("Retain dead tuples")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 37524364290..dbc586c5bc3 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2319,8 +2319,9 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION <name> SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_dead_tuples", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION <name> SKIP ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3774,8 +3775,9 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_dead_tuples", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 2cf8d55d706..cc06fc29ab2 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -316,16 +316,6 @@ typedef struct XLogRecData uint32 len; /* length of rmgr data to include */ } XLogRecData; -/* - * Recovery target action. - */ -typedef enum -{ - RECOVERY_TARGET_ACTION_PAUSE, - RECOVERY_TARGET_ACTION_PROMOTE, - RECOVERY_TARGET_ACTION_SHUTDOWN, -} RecoveryTargetAction; - struct LogicalDecodingContext; struct XLogRecordBuffer; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 91446303024..8e475e266d1 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -40,6 +40,16 @@ typedef enum RECOVERY_TARGET_TIMELINE_NUMERIC, } RecoveryTargetTimeLineGoal; +/* + * Recovery target action. + */ +typedef enum +{ + RECOVERY_TARGET_ACTION_PAUSE, + RECOVERY_TARGET_ACTION_PROMOTE, + RECOVERY_TARGET_ACTION_SHUTDOWN, +} RecoveryTargetAction; + /* Recovery pause states */ typedef enum RecoveryPauseState { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index a3f3315fed9..5173d422d46 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202507091 +#define CATALOG_VERSION_NO 202507231 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 1fc19146f46..3ee8fed7e53 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11801,6 +11801,10 @@ proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'text pg_lsn', prosrc => 'binary_upgrade_replorigin_advance' }, +{ oid => '9159', descr => 'for use by pg_upgrade (conflict detection slot)', + proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f', + provolatile => 'v', proparallel => 'u', prorettype => 'void', + proargtypes => '', prosrc => 'binary_upgrade_create_conflict_detection_slot' }, # conversion functions { oid => '4302', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..231ef84ec9a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subretaindeadtuples; /* True if dead tuples useful for + * conflict detection are retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool retaindeadtuples; /* True if dead tuples useful for conflict + * detection are retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index c2262e46a7f..9b288ad22a6 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -28,4 +28,9 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); + +extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, + int elevel_for_sub_disabled); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 49137a0a570..af13bd6bf3d 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -481,7 +481,7 @@ libpqsrv_notice_receiver(void *arg, const PGresult *res) len--; ereport(LOG, - errmsg_internal("%s: %.*s", _(prefix), len, message)); + errmsg_internal("%s: %.*s", prefix, len, message)); } #endif /* LIBPQ_BE_FE_HELPERS_H */ diff --git a/src/include/port/solaris.h b/src/include/port/solaris.h index e63a3bd824d..8ff40007c7f 100644 --- a/src/include/port/solaris.h +++ b/src/include/port/solaris.h @@ -24,3 +24,12 @@ #if defined(__i386__) #include <sys/isa_defs.h> #endif + +/* + * On original Solaris, PAM conversation procs lack a "const" in their + * declaration; but recent OpenIndiana versions put it there by default. + * The least messy way to deal with this is to define _PAM_LEGACY_NONCONST, + * which causes OpenIndiana to declare pam_conv per the Solaris tradition, + * and also use that symbol to control omitting the "const" in our own code. + */ +#define _PAM_LEGACY_NONCONST 1 diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..b29453e8e4f 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -25,8 +25,11 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); +extern void CreateConflictDetectionSlot(void); + extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 19b4e8b6a03..e8fc342d1a9 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -21,6 +21,13 @@ #define PG_REPLSLOT_DIR "pg_replslot" /* + * The reserved name for a replication slot used to retain dead tuples for + * conflict detection in logical replication. See + * maybe_advance_nonremovable_xid() for detail. + */ +#define CONFLICT_DETECTION_SLOT "pg_conflict_detection" + +/* * Behaviour of replication slots, upon release or crash. * * Slots marked as PERSISTENT are crash-safe and will not be dropped when @@ -311,7 +318,9 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern void ReplicationSlotInitialize(void); -extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern bool ReplicationSlotValidateName(const char *name, + bool allow_reserved_name, + int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..0c7b8440a61 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,16 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * The changes made by this and later transactions must be retained to + * ensure reliable conflict detection during the apply phase. + * + * The logical replication launcher manages an internal replication slot + * named "pg_conflict_detection". It asynchronously collects this ID to + * decide when to advance the xmin value of the slot. + */ + TransactionId oldest_nonremovable_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -245,7 +255,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running, extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, - dsm_handle subworker_dsm); + dsm_handle subworker_dsm, + bool retain_dead_tuples); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index e7a0a234b6c..2933eea0649 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -201,7 +201,7 @@ typedef enum PgAioHandleCallbackID } PgAioHandleCallbackID; #define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV -StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS), +StaticAssertDecl(PGAIO_HCB_MAX < (1 << PGAIO_RESULT_ID_BITS), "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS"); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 08a72569ae5..5e717765764 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -176,51 +176,23 @@ extern void LWLockInitialize(LWLock *lock, int tranche_id); * Every tranche ID less than NUM_INDIVIDUAL_LWLOCKS is reserved; also, * we reserve additional tranche IDs for builtin tranches not included in * the set of individual LWLocks. A call to LWLockNewTrancheId will never - * return a value less than LWTRANCHE_FIRST_USER_DEFINED. + * return a value less than LWTRANCHE_FIRST_USER_DEFINED. The actual list of + * built-in tranches is kept in lwlocklist.h. */ typedef enum BuiltinTrancheIds { - LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS, - LWTRANCHE_COMMITTS_BUFFER, - LWTRANCHE_SUBTRANS_BUFFER, - LWTRANCHE_MULTIXACTOFFSET_BUFFER, - LWTRANCHE_MULTIXACTMEMBER_BUFFER, - LWTRANCHE_NOTIFY_BUFFER, - LWTRANCHE_SERIAL_BUFFER, - LWTRANCHE_WAL_INSERT, - LWTRANCHE_BUFFER_CONTENT, - LWTRANCHE_REPLICATION_ORIGIN_STATE, - LWTRANCHE_REPLICATION_SLOT_IO, - LWTRANCHE_LOCK_FASTPATH, - LWTRANCHE_BUFFER_MAPPING, - LWTRANCHE_LOCK_MANAGER, - LWTRANCHE_PREDICATE_LOCK_MANAGER, - LWTRANCHE_PARALLEL_HASH_JOIN, - LWTRANCHE_PARALLEL_BTREE_SCAN, - LWTRANCHE_PARALLEL_QUERY_DSA, - LWTRANCHE_PER_SESSION_DSA, - LWTRANCHE_PER_SESSION_RECORD_TYPE, - LWTRANCHE_PER_SESSION_RECORD_TYPMOD, - LWTRANCHE_SHARED_TUPLESTORE, - LWTRANCHE_SHARED_TIDBITMAP, - LWTRANCHE_PARALLEL_APPEND, - LWTRANCHE_PER_XACT_PREDICATE_LIST, - LWTRANCHE_PGSTATS_DSA, - LWTRANCHE_PGSTATS_HASH, - LWTRANCHE_PGSTATS_DATA, - LWTRANCHE_LAUNCHER_DSA, - LWTRANCHE_LAUNCHER_HASH, - LWTRANCHE_DSM_REGISTRY_DSA, - LWTRANCHE_DSM_REGISTRY_HASH, - LWTRANCHE_COMMITTS_SLRU, - LWTRANCHE_MULTIXACTMEMBER_SLRU, - LWTRANCHE_MULTIXACTOFFSET_SLRU, - LWTRANCHE_NOTIFY_SLRU, - LWTRANCHE_SERIAL_SLRU, - LWTRANCHE_SUBTRANS_SLRU, - LWTRANCHE_XACT_SLRU, - LWTRANCHE_PARALLEL_VACUUM_DSA, - LWTRANCHE_AIO_URING_COMPLETION, + /* + * LWTRANCHE_INVALID is an unused value that only exists to initialize the + * rest of the tranches to appropriate values. + */ + LWTRANCHE_INVALID = NUM_INDIVIDUAL_LWLOCKS - 1, + +#define PG_LWLOCK(id, name) +#define PG_LWLOCKTRANCHE(id, name) LWTRANCHE_##id, +#include "storage/lwlocklist.h" +#undef PG_LWLOCK +#undef PG_LWLOCKTRANCHE + LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..208d2e3a8ed 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -2,9 +2,10 @@ * * lwlocklist.h * - * The predefined LWLock list is kept in its own source file for use by - * automatic tools. The exact representation of a keyword is determined by - * the PG_LWLOCK macro, which is not defined in this file; it can be + * The list of predefined LWLocks and built-in LWLock tranches is kept in + * its own source file for use by automatic tools. The exact + * representation of a keyword is determined by the PG_LWLOCK and + * PG_LWLOCKTRANCHE macros, which are not defined in this file; they can be * defined by the caller for special purposes. * * Also, generate-lwlocknames.pl processes this file to create lwlocknames.h. @@ -84,3 +85,53 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) + +/* + * There also exist several built-in LWLock tranches. As with the predefined + * LWLocks, be sure to update the WaitEventLWLock section of + * src/backend/utils/activity/wait_event_names.txt when modifying this list. + * + * Note that the IDs here (the first value) don't include the LWTRANCHE_ + * prefix. It's added elsewhere. + */ +PG_LWLOCKTRANCHE(XACT_BUFFER, XactBuffer) +PG_LWLOCKTRANCHE(COMMITTS_BUFFER, CommitTsBuffer) +PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer) +PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer) +PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer) +PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer) +PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer) +PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert) +PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent) +PG_LWLOCKTRANCHE(REPLICATION_ORIGIN_STATE, ReplicationOriginState) +PG_LWLOCKTRANCHE(REPLICATION_SLOT_IO, ReplicationSlotIO) +PG_LWLOCKTRANCHE(LOCK_FASTPATH, LockFastPath) +PG_LWLOCKTRANCHE(BUFFER_MAPPING, BufferMapping) +PG_LWLOCKTRANCHE(LOCK_MANAGER, LockManager) +PG_LWLOCKTRANCHE(PREDICATE_LOCK_MANAGER, PredicateLockManager) +PG_LWLOCKTRANCHE(PARALLEL_HASH_JOIN, ParallelHashJoin) +PG_LWLOCKTRANCHE(PARALLEL_BTREE_SCAN, ParallelBtreeScan) +PG_LWLOCKTRANCHE(PARALLEL_QUERY_DSA, ParallelQueryDSA) +PG_LWLOCKTRANCHE(PER_SESSION_DSA, PerSessionDSA) +PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPE, PerSessionRecordType) +PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPMOD, PerSessionRecordTypmod) +PG_LWLOCKTRANCHE(SHARED_TUPLESTORE, SharedTupleStore) +PG_LWLOCKTRANCHE(SHARED_TIDBITMAP, SharedTidBitmap) +PG_LWLOCKTRANCHE(PARALLEL_APPEND, ParallelAppend) +PG_LWLOCKTRANCHE(PER_XACT_PREDICATE_LIST, PerXactPredicateList) +PG_LWLOCKTRANCHE(PGSTATS_DSA, PgStatsDSA) +PG_LWLOCKTRANCHE(PGSTATS_HASH, PgStatsHash) +PG_LWLOCKTRANCHE(PGSTATS_DATA, PgStatsData) +PG_LWLOCKTRANCHE(LAUNCHER_DSA, LogicalRepLauncherDSA) +PG_LWLOCKTRANCHE(LAUNCHER_HASH, LogicalRepLauncherHash) +PG_LWLOCKTRANCHE(DSM_REGISTRY_DSA, DSMRegistryDSA) +PG_LWLOCKTRANCHE(DSM_REGISTRY_HASH, DSMRegistryHash) +PG_LWLOCKTRANCHE(COMMITTS_SLRU, CommitTsSLRU) +PG_LWLOCKTRANCHE(MULTIXACTOFFSET_SLRU, MultiXactOffsetSLRU) +PG_LWLOCKTRANCHE(MULTIXACTMEMBER_SLRU, MultiXactMemberSLRU) +PG_LWLOCKTRANCHE(NOTIFY_SLRU, NotifySLRU) +PG_LWLOCKTRANCHE(SERIAL_SLRU, SerialSLRU) +PG_LWLOCKTRANCHE(SUBTRANS_SLRU, SubtransSLRU) +PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU) +PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA) +PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion) diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 9f9b3fcfbf1..c6f5ebceefd 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -130,9 +130,17 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend; * the checkpoint are actually destroyed on disk. Replay can cope with a file * or block that doesn't exist, but not with a block that has the wrong * contents. + * + * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but + * it explicitly indicates that the reason for delaying the checkpoint is due + * to a transaction being within a critical commit section. We need this new + * flag to ensure all the transactions that have acquired commit timestamp are + * finished before we allow the logical replication client to advance its xid + * which is used to hold back dead rows for conflict detection. */ #define DELAY_CHKPT_START (1<<0) #define DELAY_CHKPT_COMPLETE (1<<1) +#define DELAY_CHKPT_IN_COMMIT (DELAY_CHKPT_START | 1<<2) typedef enum { diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index e4877d88e8f..2f4ae06c279 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -55,7 +55,8 @@ extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern TransactionId GetOldestNonRemovableTransactionId(Relation rel); extern TransactionId GetOldestTransactionIdConsideredRunning(void); -extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly, + bool allDbs); extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin); diff --git a/src/interfaces/ecpg/ecpglib/connect.c b/src/interfaces/ecpg/ecpglib/connect.c index 713cbbf6360..78de9f298ba 100644 --- a/src/interfaces/ecpg/ecpglib/connect.c +++ b/src/interfaces/ecpg/ecpglib/connect.c @@ -264,7 +264,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p struct connection *this; int i, connect_params = 0; - char *dbname = name ? ecpg_strdup(name, lineno) : NULL, + bool alloc_failed = (sqlca == NULL); + char *dbname = name ? ecpg_strdup(name, lineno, &alloc_failed) : NULL, *host = NULL, *tmp, *port = NULL, @@ -273,11 +274,12 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p const char **conn_keywords; const char **conn_values; - if (sqlca == NULL) + if (alloc_failed) { ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); - ecpg_free(dbname); + if (dbname) + ecpg_free(dbname); return false; } @@ -302,7 +304,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p if (envname) { ecpg_free(dbname); - dbname = ecpg_strdup(envname, lineno); + dbname = ecpg_strdup(envname, lineno, &alloc_failed); } } @@ -354,7 +356,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname + offset, '?'); if (tmp != NULL) /* options given */ { - options = ecpg_strdup(tmp + 1, lineno); + options = ecpg_strdup(tmp + 1, lineno, &alloc_failed); *tmp = '\0'; } @@ -363,7 +365,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p { if (tmp[1] != '\0') /* non-empty database name */ { - realname = ecpg_strdup(tmp + 1, lineno); + realname = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; } *tmp = '\0'; @@ -373,7 +375,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p if (tmp != NULL) /* port number given */ { *tmp = '\0'; - port = ecpg_strdup(tmp + 1, lineno); + port = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; } @@ -407,7 +409,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p { if (*(dbname + offset) != '\0') { - host = ecpg_strdup(dbname + offset, lineno); + host = ecpg_strdup(dbname + offset, lineno, &alloc_failed); connect_params++; } } @@ -419,7 +421,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname, ':'); if (tmp != NULL) /* port number given */ { - port = ecpg_strdup(tmp + 1, lineno); + port = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; *tmp = '\0'; } @@ -427,14 +429,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname, '@'); if (tmp != NULL) /* host name given */ { - host = ecpg_strdup(tmp + 1, lineno); + host = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; *tmp = '\0'; } if (strlen(dbname) > 0) { - realname = ecpg_strdup(dbname, lineno); + realname = ecpg_strdup(dbname, lineno, &alloc_failed); connect_params++; } else @@ -465,7 +467,18 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p */ conn_keywords = (const char **) ecpg_alloc((connect_params + 1) * sizeof(char *), lineno); conn_values = (const char **) ecpg_alloc(connect_params * sizeof(char *), lineno); - if (conn_keywords == NULL || conn_values == NULL) + + /* Decide on a connection name */ + if (connection_name != NULL || realname != NULL) + { + this->name = ecpg_strdup(connection_name ? connection_name : realname, + lineno, &alloc_failed); + } + else + this->name = NULL; + + /* Deal with any failed allocations above */ + if (conn_keywords == NULL || conn_values == NULL || alloc_failed) { if (host) ecpg_free(host); @@ -481,6 +494,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p ecpg_free(conn_keywords); if (conn_values) ecpg_free(conn_values); + if (this->name) + ecpg_free(this->name); free(this); return false; } @@ -515,17 +530,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p ecpg_free(conn_keywords); if (conn_values) ecpg_free(conn_values); + if (this->name) + ecpg_free(this->name); free(this); return false; } } #endif - if (connection_name != NULL) - this->name = ecpg_strdup(connection_name, lineno); - else - this->name = ecpg_strdup(realname, lineno); - this->cache_head = NULL; this->prep_stmts = NULL; diff --git a/src/interfaces/ecpg/ecpglib/descriptor.c b/src/interfaces/ecpg/ecpglib/descriptor.c index 651d5c8b2ed..466428edfeb 100644 --- a/src/interfaces/ecpg/ecpglib/descriptor.c +++ b/src/interfaces/ecpg/ecpglib/descriptor.c @@ -240,8 +240,9 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...) act_tuple; struct variable data_var; struct sqlca_t *sqlca = ECPGget_sqlca(); + bool alloc_failed = (sqlca == NULL); - if (sqlca == NULL) + if (alloc_failed) { ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); @@ -493,7 +494,14 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...) #ifdef WIN32 stmt.oldthreadlocale = _configthreadlocale(_ENABLE_PER_THREAD_LOCALE); #endif - stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno); + stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), + lineno, &alloc_failed); + if (alloc_failed) + { + va_end(args); + return false; + } + setlocale(LC_NUMERIC, "C"); #endif diff --git a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h index 75cc68275bd..949ff66cefc 100644 --- a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h +++ b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h @@ -175,7 +175,7 @@ void ecpg_free(void *ptr); bool ecpg_init(const struct connection *con, const char *connection_name, const int lineno); -char *ecpg_strdup(const char *string, int lineno); +char *ecpg_strdup(const char *string, int lineno, bool *alloc_failed); const char *ecpg_type_name(enum ECPGttype typ); int ecpg_dynamic_type(Oid type); int sqlda_dynamic_type(Oid type, enum COMPAT_MODE compat); diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index f52da06de9a..84a4a9fc578 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -860,9 +860,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari numeric *nval; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -923,9 +923,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -970,9 +970,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -1017,9 +1017,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -2001,7 +2001,8 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, return false; } #endif - stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno); + stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno, + NULL); if (stmt->oldlocale == NULL) { ecpg_do_epilogue(stmt); @@ -2030,7 +2031,14 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, statement_type = ECPGst_execute; } else - stmt->command = ecpg_strdup(query, lineno); + { + stmt->command = ecpg_strdup(query, lineno, NULL); + if (!stmt->command) + { + ecpg_do_epilogue(stmt); + return false; + } + } stmt->name = NULL; @@ -2042,7 +2050,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, if (command) { stmt->name = stmt->command; - stmt->command = ecpg_strdup(command, lineno); + stmt->command = ecpg_strdup(command, lineno, NULL); + if (!stmt->command) + { + ecpg_do_epilogue(stmt); + return false; + } } else { @@ -2175,7 +2188,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, if (!is_prepared_name_set && stmt->statement_type == ECPGst_prepare) { - stmt->name = ecpg_strdup(var->value, lineno); + stmt->name = ecpg_strdup(var->value, lineno, NULL); + if (!stmt->name) + { + ecpg_do_epilogue(stmt); + return false; + } is_prepared_name_set = true; } } diff --git a/src/interfaces/ecpg/ecpglib/memory.c b/src/interfaces/ecpg/ecpglib/memory.c index 6979be2c988..2112e55b6e4 100644 --- a/src/interfaces/ecpg/ecpglib/memory.c +++ b/src/interfaces/ecpg/ecpglib/memory.c @@ -43,8 +43,15 @@ ecpg_realloc(void *ptr, long size, int lineno) return new; } +/* + * Wrapper for strdup(), with NULL in input treated as a correct case. + * + * "alloc_failed" can be optionally specified by the caller to check for + * allocation failures. The caller is responsible for its initialization, + * as ecpg_strdup() may be called repeatedly across multiple allocations. + */ char * -ecpg_strdup(const char *string, int lineno) +ecpg_strdup(const char *string, int lineno, bool *alloc_failed) { char *new; @@ -54,6 +61,8 @@ ecpg_strdup(const char *string, int lineno) new = strdup(string); if (!new) { + if (alloc_failed) + *alloc_failed = true; ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); return NULL; } diff --git a/src/interfaces/ecpg/ecpglib/prepare.c b/src/interfaces/ecpg/ecpglib/prepare.c index ea1146f520f..dd6fd1fe7f4 100644 --- a/src/interfaces/ecpg/ecpglib/prepare.c +++ b/src/interfaces/ecpg/ecpglib/prepare.c @@ -85,9 +85,22 @@ ecpg_register_prepared_stmt(struct statement *stmt) /* create statement */ prep_stmt->lineno = lineno; prep_stmt->connection = con; - prep_stmt->command = ecpg_strdup(stmt->command, lineno); + prep_stmt->command = ecpg_strdup(stmt->command, lineno, NULL); + if (!prep_stmt->command) + { + ecpg_free(prep_stmt); + ecpg_free(this); + return false; + } prep_stmt->inlist = prep_stmt->outlist = NULL; - this->name = ecpg_strdup(stmt->name, lineno); + this->name = ecpg_strdup(stmt->name, lineno, NULL); + if (!this->name) + { + ecpg_free(prep_stmt->command); + ecpg_free(prep_stmt); + ecpg_free(this); + return false; + } this->stmt = prep_stmt; this->prepared = true; @@ -177,14 +190,27 @@ prepare_common(int lineno, struct connection *con, const char *name, const char /* create statement */ stmt->lineno = lineno; stmt->connection = con; - stmt->command = ecpg_strdup(variable, lineno); + stmt->command = ecpg_strdup(variable, lineno, NULL); + if (!stmt->command) + { + ecpg_free(stmt); + ecpg_free(this); + return false; + } stmt->inlist = stmt->outlist = NULL; /* if we have C variables in our statement replace them with '?' */ replace_variables(&(stmt->command), lineno); /* add prepared statement to our list */ - this->name = ecpg_strdup(name, lineno); + this->name = ecpg_strdup(name, lineno, NULL); + if (!this->name) + { + ecpg_free(stmt->command); + ecpg_free(stmt); + ecpg_free(this); + return false; + } this->stmt = stmt; /* and finally really prepare the statement */ @@ -540,7 +566,9 @@ AddStmtToCache(int lineno, /* line # of statement */ /* add the query to the entry */ entry = &stmtCacheEntries[entNo]; entry->lineno = lineno; - entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno); + entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno, NULL); + if (!entry->ecpgQuery) + return -1; entry->connection = connection; entry->execs = 0; memcpy(entry->stmtID, stmtID, sizeof(entry->stmtID)); @@ -567,6 +595,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha ecpg_log("ecpg_auto_prepare on line %d: statement found in cache; entry %d\n", lineno, entNo); stmtID = stmtCacheEntries[entNo].stmtID; + *name = ecpg_strdup(stmtID, lineno, NULL); + if (*name == NULL) + return false; con = ecpg_get_connection(connection_name); prep = ecpg_find_prepared_statement(stmtID, con, NULL); @@ -574,7 +605,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha if (!prep && !prepare_common(lineno, con, stmtID, query)) return false; - *name = ecpg_strdup(stmtID, lineno); } else { @@ -584,6 +614,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha /* generate a statement ID */ sprintf(stmtID, "ecpg%d", nextStmtID++); + *name = ecpg_strdup(stmtID, lineno, NULL); + if (*name == NULL) + return false; if (!ECPGprepare(lineno, connection_name, 0, stmtID, query)) return false; @@ -591,8 +624,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha entNo = AddStmtToCache(lineno, stmtID, connection_name, compat, query); if (entNo < 0) return false; - - *name = ecpg_strdup(stmtID, lineno); } /* increase usage counter */ diff --git a/src/port/pgmkdirp.c b/src/port/pgmkdirp.c index d943559760d..7d7cea4dd0e 100644 --- a/src/port/pgmkdirp.c +++ b/src/port/pgmkdirp.c @@ -73,7 +73,7 @@ pg_mkdir_p(char *path, int omode) if (p[0] == '/' && p[1] == '/') { /* network drive */ - p = strstr(p + 2, "/"); + p = strchr(p + 2, '/'); if (p == NULL) { errno = EINVAL; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 529b2241731..a98c97f7616 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,34 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - retain_dead_tuples must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo); +ERROR: retain_dead_tuples requires a Boolean value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e70374..f0f714fe747 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,17 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - retain_dead_tuples must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index d78a6bac16a..7458d7fba7e 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -1,6 +1,6 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# Test the conflict detection of conflict type 'multiple_unique_conflicts'. +# Test conflicts in logical replication use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -18,7 +18,7 @@ $node_publisher->start; # Create a subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init; +$node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; # Create a table on publisher @@ -146,4 +146,195 @@ $node_subscriber->wait_for_log( pass('multiple_unique_conflicts detected on a leaf partition during insert'); +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes. + +# node_A. Increase the log_min_messages setting to DEBUG2 to debug test +# failures. Disable autovacuum to avoid generating xid that could affect the +# replication slot's xmin value. +my $node_A = $node_publisher; +$node_A->append_conf( + 'postgresql.conf', + qq{autovacuum = off + log_min_messages = 'debug2'}); +$node_A->restart; + +# node_B +my $node_B = $node_subscriber; +$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on"); +$node_B->restart; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +my $subname_AB = 'tap_sub_a_b'; +my $subname_BA = 'tap_sub_b_a'; + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' + PUBLICATION tap_pub_A + WITH (origin = none, retain_dead_tuples = true)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off)"); + +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +is(1, 1, 'Bidirectional replication setup is complete'); + +# Confirm that the conflict detection slot is created on Node B and the xmin +# value is valid. +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); + +################################################## +# Check that the retain_dead_tuples option can be enabled only for disabled +# subscriptions. Validate the NOTICE message during the subscription DDL, and +# ensure the conflict detection slot is created upon enabling the +# retain_dead_tuples option. +################################################## + +# Alter retain_dead_tuples for enabled subscription +my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true)"); +ok( $stderr =~ + /ERROR: cannot set option \"retain_dead_tuples\" for enabled subscription/, + "altering retain_dead_tuples is not allowed for enabled subscription"); + +# Disable the subscription +$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;"); + +# Enable retain_dead_tuples for disabled subscription +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);"); +ok( $stderr =~ + /NOTICE: deleted rows to detect conflicts would not be removed until the subscription is enabled/, + "altering retain_dead_tuples is allowed for disabled subscription"); + +# Re-enable the subscription +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); + +# Confirm that the conflict detection slot is created on Node A and the xmin +# value is valid. +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +################################################## +# Check the WARNING when changing the origin to ANY, if retain_dead_tuples is +# enabled. This warns of the possibility of receiving changes from origins +# other than the publisher. +################################################## + +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = any);"); +ok( $stderr =~ + /WARNING: subscription "tap_sub_a_b" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins/, + "warn of the possibility of receiving changes from origins other than the publisher"); + +# Reset the origin to none +$node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = none);"); + +############################################################################### +# Check that dead tuples on node A cannot be cleaned by VACUUM until the +# concurrent transactions on Node B have been applied and flushed on Node A. +############################################################################### + +# Insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); +$node_A->wait_for_catchup($subname_BA); + +my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is($result, qq(1|1 +2|2), 'check replicated insert on node B'); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); + +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 are dead but not yet removable/, + 'the deleted column is non-removable'); + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +# Remember the next transaction ID to be assigned +my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); + +# Confirm that the xmin value is advanced to the latest nextXid. If no +# transactions are running, the apply worker selects nextXid as the candidate +# for the non-removable xid. See GetOldestActiveTransactionId(). +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is updated on Node A"); + +# Confirm that the dead tuple can be removed now +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 removed, 1 remain, 0 are dead but not yet removable/, + 'the deleted column is removed'); + +############################################################################### +# Check that the replication slot pg_conflict_detection is dropped after +# removing all the subscriptions. +############################################################################### + +$node_B->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_BA"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node B"); + +$node_A->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_AB"); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node A"); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cd897467088..a8656419cb6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2566,6 +2566,8 @@ RestrictInfo Result ResultRelInfo ResultState +RetainDeadTuplesData +RetainDeadTuplesPhase ReturnSetInfo ReturnStmt ReturningClause |