summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/basic_archive/basic_archive.c2
-rw-r--r--contrib/dblink/dblink.c4
-rw-r--r--contrib/postgres_fdw/connection.c2
-rw-r--r--doc/src/sgml/catalogs.sgml11
-rw-r--r--doc/src/sgml/config.sgml2
-rw-r--r--doc/src/sgml/func.sgml16
-rw-r--r--doc/src/sgml/logical-replication.sgml74
-rw-r--r--doc/src/sgml/protocol.sgml88
-rw-r--r--doc/src/sgml/ref/alter_subscription.sgml18
-rw-r--r--doc/src/sgml/ref/create_subscription.sgml87
-rw-r--r--src/backend/access/transam/twophase.c32
-rw-r--r--src/backend/access/transam/xact.c18
-rw-r--r--src/backend/access/transam/xlog.c2
-rw-r--r--src/backend/access/transam/xlogrecovery.c2
-rw-r--r--src/backend/catalog/pg_subscription.c1
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/commands/subscriptioncmds.c400
-rw-r--r--src/backend/libpq/auth.c12
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c2
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/launcher.c228
-rw-r--r--src/backend/replication/logical/reorderbuffer.c2
-rw-r--r--src/backend/replication/logical/tablesync.c3
-rw-r--r--src/backend/replication/logical/worker.c623
-rw-r--r--src/backend/replication/slot.c48
-rw-r--r--src/backend/replication/walsender.c91
-rw-r--r--src/backend/storage/aio/README.md5
-rw-r--r--src/backend/storage/ipc/procarray.c20
-rw-r--r--src/backend/storage/lmgr/generate-lwlocknames.pl110
-rw-r--r--src/backend/storage/lmgr/lwlock.c48
-rw-r--r--src/backend/utils/activity/wait_event_names.txt10
-rw-r--r--src/backend/utils/adt/pg_upgrade_support.c19
-rw-r--r--src/bin/pg_dump/pg_dump.c18
-rw-r--r--src/bin/pg_dump/pg_dump.h1
-rw-r--r--src/bin/pg_upgrade/check.c96
-rw-r--r--src/bin/pg_upgrade/info.c25
-rw-r--r--src/bin/pg_upgrade/pg_upgrade.c60
-rw-r--r--src/bin/pg_upgrade/pg_upgrade.h4
-rw-r--r--src/bin/pg_upgrade/t/004_subscription.pl85
-rw-r--r--src/bin/psql/describe.c6
-rw-r--r--src/bin/psql/tab-complete.in.c10
-rw-r--r--src/include/access/xlog_internal.h10
-rw-r--r--src/include/access/xlogrecovery.h10
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat4
-rw-r--r--src/include/catalog/pg_subscription.h5
-rw-r--r--src/include/commands/subscriptioncmds.h5
-rw-r--r--src/include/libpq/libpq-be-fe-helpers.h2
-rw-r--r--src/include/port/solaris.h9
-rw-r--r--src/include/replication/logicallauncher.h3
-rw-r--r--src/include/replication/slot.h11
-rw-r--r--src/include/replication/worker_internal.h13
-rw-r--r--src/include/storage/aio.h2
-rw-r--r--src/include/storage/lwlock.h56
-rw-r--r--src/include/storage/lwlocklist.h57
-rw-r--r--src/include/storage/proc.h8
-rw-r--r--src/include/storage/procarray.h3
-rw-r--r--src/interfaces/ecpg/ecpglib/connect.c46
-rw-r--r--src/interfaces/ecpg/ecpglib/descriptor.c12
-rw-r--r--src/interfaces/ecpg/ecpglib/ecpglib_extern.h2
-rw-r--r--src/interfaces/ecpg/ecpglib/execute.c42
-rw-r--r--src/interfaces/ecpg/ecpglib/memory.c11
-rw-r--r--src/interfaces/ecpg/ecpglib/prepare.c47
-rw-r--r--src/port/pgmkdirp.c2
-rw-r--r--src/test/regress/expected/subscription.out168
-rw-r--r--src/test/regress/sql/subscription.sql11
-rw-r--r--src/test/subscription/t/035_conflicts.pl195
-rw-r--r--src/tools/pgindent/typedefs.list2
68 files changed, 2591 insertions, 438 deletions
diff --git a/contrib/basic_archive/basic_archive.c b/contrib/basic_archive/basic_archive.c
index 4a8b8c7ac29..8fc633d2cbf 100644
--- a/contrib/basic_archive/basic_archive.c
+++ b/contrib/basic_archive/basic_archive.c
@@ -65,7 +65,7 @@ void
_PG_init(void)
{
DefineCustomStringVariable("basic_archive.archive_directory",
- gettext_noop("Archive file destination directory."),
+ "Archive file destination directory.",
NULL,
&archive_directory,
"",
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index c459a842fa9..de5bed282f3 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -242,7 +242,7 @@ dblink_get_conn(char *conname_or_str,
}
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
- gettext_noop("received message via remote connection"));
+ "received message via remote connection");
dblink_security_check(conn, NULL, connstr);
if (PQclientEncoding(conn) != GetDatabaseEncoding())
@@ -343,7 +343,7 @@ dblink_connect(PG_FUNCTION_ARGS)
}
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
- gettext_noop("received message via remote connection"));
+ "received message via remote connection");
/* check password actually used if not superuser */
dblink_security_check(conn, connname, connstr);
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index e41d47c3bbd..c1ce6f33436 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -626,7 +626,7 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
- gettext_noop("received message via remote connection"));
+ "received message via remote connection");
/* Perform post-connection security checks. */
pgfdw_security_check(keywords, values, user, conn);
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 0d23bc1b122..97f547b3cc4 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -8084,6 +8084,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<row>
<entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>subretaindeadtuples</structfield> <type>bool</type>
+ </para>
+ <para>
+ If true, the information (e.g., dead tuples, commit timestamps, and
+ origins) on the subscriber that is useful for conflict detection is
+ retained.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
</para>
<para>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c7acc0f182f..20ccb2d6b54 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4965,6 +4965,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
new setting.
This setting has no effect if <varname>primary_conninfo</varname> is not
set or the server is not in standby mode.
+ The name cannot be <literal>pg_conflict_detection</literal> as it is
+ reserved for the conflict detection slot.
</para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index f5a0e0954a1..de5b5929ee0 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29592,7 +29592,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
</para>
<para>
Creates a new physical replication slot named
- <parameter>slot_name</parameter>. The optional second parameter,
+ <parameter>slot_name</parameter>. The name cannot be
+ <literal>pg_conflict_detection</literal> as it is reserved for the
+ conflict detection slot. The optional second parameter,
when <literal>true</literal>, specifies that the <acronym>LSN</acronym> for this
replication slot be reserved immediately; otherwise
the <acronym>LSN</acronym> is reserved on first connection from a streaming
@@ -29636,7 +29638,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<para>
Creates a new logical (decoding) replication slot named
<parameter>slot_name</parameter> using the output plugin
- <parameter>plugin</parameter>. The optional third
+ <parameter>plugin</parameter>. The name cannot be
+ <literal>pg_conflict_detection</literal> as it is reserved for
+ the conflict detection slot. The optional third
parameter, <parameter>temporary</parameter>, when set to true, specifies that
the slot should not be permanently stored to disk and is only meant
for use by the current session. Temporary slots are also
@@ -29666,6 +29670,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<para>
Copies an existing physical replication slot named <parameter>src_slot_name</parameter>
to a physical replication slot named <parameter>dst_slot_name</parameter>.
+ The new slot name cannot be <literal>pg_conflict_detection</literal>,
+ as it is reserved for the conflict detection.
The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
source slot.
<parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
@@ -29688,8 +29694,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
Copies an existing logical replication slot
named <parameter>src_slot_name</parameter> to a logical replication
slot named <parameter>dst_slot_name</parameter>, optionally changing
- the output plugin and persistence. The copied logical slot starts
- from the same <acronym>LSN</acronym> as the source logical slot. Both
+ the output plugin and persistence. The new slot name cannot be
+ <literal>pg_conflict_detection</literal> as it is reserved for
+ the conflict detection. The copied logical slot starts from the same
+ <acronym>LSN</acronym> as the source logical slot. Both
<parameter>temporary</parameter> and <parameter>plugin</parameter> are
optional; if they are omitted, the values of the source slot are used.
The <literal>failover</literal> option of the source logical slot
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index e26f7f59d4a..fcac55aefe6 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1048,28 +1048,28 @@ HINT: To initiate replication, you must manually create the replication slot, e
defined) for each publication.
<programlisting><![CDATA[
/* pub # */ \dRp+
- Publication p1
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f | t | t | t | t | f
+ Publication p1
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root
+----------+------------+---------+---------+---------+-----------+-------------------+----------
+ postgres | f | t | t | t | t | none | f
Tables:
- "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text))
+ "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text))
- Publication p2
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f | t | t | t | t | f
+ Publication p2
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root
+----------+------------+---------+---------+---------+-----------+-------------------+----------
+ postgres | f | t | t | t | t | none | f
Tables:
- "public.t1"
- "public.t2" WHERE (e = 99)
+ "public.t1"
+ "public.t2" WHERE (e = 99)
- Publication p3
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f | t | t | t | t | f
+ Publication p3
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root
+----------+------------+---------+---------+---------+-----------+-------------------+----------
+ postgres | f | t | t | t | t | none | f
Tables:
- "public.t2" WHERE (d = 10)
- "public.t3" WHERE (g = 10)
+ "public.t2" WHERE (d = 10)
+ "public.t3" WHERE (g = 10)
]]></programlisting></para>
<para>
@@ -1491,10 +1491,10 @@ Publications:
for each publication.
<programlisting>
/* pub # */ \dRp+
- Publication p1
- Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root
-----------+------------+---------+---------+---------+-----------+----------
- postgres | f | t | t | t | t | f
+ Publication p1
+ Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root
+----------+------------+---------+---------+---------+-----------+-------------------+----------
+ postgres | f | t | t | t | t | none | f
Tables:
"public.t1" (id, a, b, d)
</programlisting></para>
@@ -2397,6 +2397,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
</para>
<para>
+ <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+ must be set to at least 1 when <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+ is enabled for any subscription.
+ </para>
+
+ <para>
<link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
must be set to at least the number of subscriptions (for leader apply
workers), plus some reserve for the table synchronization workers and
@@ -2532,6 +2538,22 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
dependencies on clusters before version 17.0 will silently be ignored.
</para>
+ <note>
+ <para>
+ Commit timestamps and origin data are not preserved during the upgrade.
+ As a result, even if
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+ is enabled, the upgraded subscriber may be unable to detect conflicts or
+ log relevant commit timestamps and origins when applying changes from the
+ publisher occurred before the upgrade. Additionally, immediately after the
+ upgrade, the vacuum may remove the deleted rows that are required for
+ conflict detection. This can affect the changes that were not replicated
+ before the upgrade. To ensure consistent conflict tracking, users should
+ ensure that all potentially conflicting changes are replicated to the
+ subscriber before initiating the upgrade.
+ </para>
+ </note>
+
<para>
There are some prerequisites for <application>pg_upgrade</application> to
be able to upgrade the subscriptions. If these are not met an error
@@ -2563,6 +2585,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
subscriptions present in the old cluster.
</para>
</listitem>
+ <listitem>
+ <para>
+ If there are subscriptions with retain_dead_tuples enabled, the reserved
+ replication slot <quote><literal>pg_conflict_detection</literal></quote>
+ must not exist on the new cluster. Additionally, the
+ <link linkend="guc-wal-level"><varname>wal_level</varname></link> on the
+ new cluster must be set to <literal>replica</literal> or
+ <literal>logical</literal>.
+ </para>
+ </listitem>
</itemizedlist>
</sect2>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index e74b5be1eff..b115884acb3 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2235,6 +2235,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<para>
The name of the slot to create. Must be a valid replication slot
name (see <xref linkend="streaming-replication-slots-manipulation"/>).
+ The name cannot be <literal>pg_conflict_detection</literal> as it
+ is reserved for the conflict detection.
</para>
</listitem>
</varlistentry>
@@ -2653,6 +2655,65 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</variablelist>
</listitem>
</varlistentry>
+
+ <varlistentry id="protocol-replication-primary-status-update">
+ <term>Primary status update (B)</term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>Byte1('s')</term>
+ <listitem>
+ <para>
+ Identifies the message as a primary status update.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>Int64</term>
+ <listitem>
+ <para>
+ The latest WAL write position on the server.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>Int64</term>
+ <listitem>
+ <para>
+ The oldest transaction ID that is currently in the commit phase on
+ the server, along with its epoch. The most significant 32 bits are
+ the epoch. The least significant 32 bits are the transaction ID.
+ If no transactions are active on the server, this number will be
+ the next transaction ID to be assigned.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>Int64</term>
+ <listitem>
+ <para>
+ The next transaction ID to be assigned on the server, along with
+ its epoch. The most significant 32 bits are the epoch. The least
+ significant 32 bits are the transaction ID.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>Int64</term>
+ <listitem>
+ <para>
+ The server's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
</variablelist>
<para>
@@ -2797,6 +2858,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</variablelist>
</listitem>
</varlistentry>
+
+ <varlistentry id="protocol-replication-standby-wal-status-request">
+ <term>Request primary status update (F)</term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>Byte1('p')</term>
+ <listitem>
+ <para>
+ Identifies the message as a request for a primary status update.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>Int64</term>
+ <listitem>
+ <para>
+ The client's system clock at the time of transmission, as
+ microseconds since midnight on 2000-01-01.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index fdc648d007f..d48cdc76bd3 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -235,8 +235,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
- <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
- <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
+ <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+ <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>
@@ -261,8 +262,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</para>
<para>
- The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
- and <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+ The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+ <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
+ <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
parameters can only be altered when the subscription is disabled.
</para>
@@ -285,6 +287,14 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
option is changed from <literal>true</literal> to <literal>false</literal>,
the publisher will replicate the transactions again when they are committed.
</para>
+
+ <para>
+ If the <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+ option is altered to <literal>false</literal> and no other subscription
+ has this option enabled, the replication slot named
+ <quote><literal>pg_conflict_detection</literal></quote>, created to retain
+ dead tuples for conflict detection, will be dropped.
+ </para>
</listitem>
</varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 57dec28a5df..b8cd15f3280 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -169,7 +169,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
<listitem>
<para>
Name of the publisher's replication slot to use. The default is
- to use the name of the subscription for the slot name.
+ to use the name of the subscription for the slot name. The name cannot
+ be <literal>pg_conflict_detection</literal> as it is reserved for the
+ conflict detection.
</para>
<para>
@@ -435,6 +437,89 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>
+
+ <varlistentry id="sql-createsubscription-params-with-retain-dead-tuples">
+ <term><literal>retain_dead_tuples</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether the information (e.g., dead tuples, commit
+ timestamps, and origins) required for conflict detection on the
+ subscriber is retained. The default is <literal>false</literal>.
+ If set to <literal>true</literal>, a physical replication slot named
+ <quote><literal>pg_conflict_detection</literal></quote> will be
+ created on the subscriber to prevent the conflict information from
+ being removed.
+ </para>
+
+ <para>
+ Note that the information useful for conflict detection is retained
+ only after the creation of the slot. You can verify the existence of
+ this slot by querying <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+ And even if multiple subscriptions on one node enable this option,
+ only one replication slot will be created. Also,
+ <varname>wal_level</varname> must be set to <literal>replica</literal>
+ or higher to allow the replication slot to be used.
+ </para>
+
+ <caution>
+ <para>
+ Note that the information for conflict detection cannot be purged if
+ the subscription is disabled; thus, the information will accumulate
+ until the subscription is enabled. To prevent excessive accumulation,
+ it is recommended to disable <literal>retain_dead_tuples</literal>
+ if the subscription will be inactive for an extended period.
+ </para>
+
+ <para>
+ Additionally when enabling <literal>retain_dead_tuples</literal> for
+ conflict detection in logical replication, it is important to design the
+ replication topology to balance data retention requirements with
+ overall system performance. This option provides minimal performance
+ overhead when applied appropriately. The following scenarios illustrate
+ effective usage patterns when enabling this option.
+ </para>
+
+ <para>
+ a. Large Tables with Bidirectional Writes:
+ For large tables subject to concurrent writes on both publisher and
+ subscriber nodes, publishers can define row filters when creating
+ publications to segment data. This allows multiple subscriptions
+ to replicate exclusive subsets of the table in parallel, optimizing
+ the throughput.
+ </para>
+
+ <para>
+ b. Write-Enabled Subscribers:
+ If a subscriber node is expected to perform write operations, replication
+ can be structured using multiple publications and subscriptions. By
+ distributing tables across these publications, the workload is spread among
+ several apply workers, improving concurrency and reducing contention.
+ </para>
+
+ <para>
+ c. Read-Only Subscribers:
+ In configurations involving single or multiple publisher nodes
+ performing concurrent write operations, read-only subscriber nodes may
+ replicate changes without seeing a performance impact if it does index
+ scan. However, if the subscriber is impacted due to replication lag or
+ scan performance (say due to sequential scans), it needs to follow one
+ of the two previous strategies to distribute the workload on the
+ subscriber.
+ </para>
+ </caution>
+
+ <para>
+ This option cannot be enabled if the publisher is a physical standby.
+ </para>
+
+ <para>
+ Enabling this option ensures retention of information useful for
+ conflict detection solely for changes occurring locally on the
+ publisher. For the changes originating from different origins,
+ reliable conflict detection cannot be guaranteed.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist></para>
</listitem>
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 85cbe397cb2..7918176fc58 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact)
* starting immediately after the WAL record is inserted could complete
* without fsync'ing our state file. (This is essentially the same kind
* of race condition as the COMMIT-to-clog-write case that
- * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
+ * the critical commit section. We need to know about such transactions
+ * for conflict detection in logical replication. See
+ * GetOldestActiveTransactionId(true, false) and its use.
*
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
@@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
* RecordTransactionCommitPrepared
*
* This is basically the same as RecordTransactionCommit (q.v. if you change
- * this function): in particular, we must set DELAY_CHKPT_START to avoid a
+ * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
* race condition.
*
* We know the transaction made at least one XLOG entry (its PREPARE),
@@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
const char *gid)
{
XLogRecPtr recptr;
- TimestampTz committs = GetCurrentTimestamp();
+ TimestampTz committs;
bool replorigin;
/*
@@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
+ * commit time is written.
+ */
+ pg_write_barrier();
+
+ /*
+ * Note it is important to set committs value after marking ourselves as
+ * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
+ * we want to ensure all transactions that have acquired commit timestamp
+ * are finished before we allow the logical replication client to advance
+ * its xid which is used to hold back dead rows for conflict detection.
+ * See comments atop worker.c.
+ */
+ committs = GetCurrentTimestamp();
/*
* Emit the XLOG commit record. Note that we mark 2PC commits as
@@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 41601fcb280..b46e7e9c2a6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1431,10 +1431,22 @@ RecordTransactionCommit(void)
* without holding the ProcArrayLock, since we're the only one
* modifying it. This makes checkpoint's determination of which xacts
* are delaying the checkpoint a bit fuzzy, but it doesn't matter.
+ *
+ * Note, it is important to get the commit timestamp after marking the
+ * transaction in the commit critical section. See
+ * RecordTransactionCommitPrepared.
*/
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
START_CRIT_SECTION();
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ Assert(xactStopTimestamp == 0);
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible
+ * before commit time is written.
+ */
+ pg_write_barrier();
/*
* Insert the commit XLOG record.
@@ -1537,7 +1549,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..eefffc4277a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7121,7 +7121,7 @@ CreateCheckPoint(int flags)
* starting snapshot of locks and transactions.
*/
if (!shutdown && XLogStandbyInfoActive())
- checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
+ checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true);
else
checkPoint.oldestActiveXid = InvalidTransactionId;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..e8f3ba00caa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4760,7 +4760,7 @@ bool
check_primary_slot_name(char **newval, void **extra, GucSource source)
{
if (*newval && strcmp(*newval, "") != 0 &&
- !ReplicationSlotValidateName(*newval, WARNING))
+ !ReplicationSlotValidateName(*newval, false, WARNING))
return false;
return true;
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1395032413e..63c2992d19f 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->passwordrequired = subform->subpasswordrequired;
sub->runasowner = subform->subrunasowner;
sub->failover = subform->subfailover;
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b2d5332effc..f6eca09ee15 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1386,7 +1386,8 @@ REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover,
- subslotname, subsynccommit, subpublications, suborigin)
+ subretaindeadtuples, subslotname, subsynccommit,
+ subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e23b0de7242..cd6c3684482 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
@@ -71,8 +72,9 @@
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_FAILOVER 0x00002000
-#define SUBOPT_LSN 0x00004000
-#define SUBOPT_ORIGIN 0x00008000
+#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
+#define SUBOPT_LSN 0x00008000
+#define SUBOPT_ORIGIN 0x00010000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -98,6 +100,7 @@ typedef struct SubOpts
bool passwordrequired;
bool runasowner;
bool failover;
+ bool retaindeadtuples;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -105,8 +108,10 @@ typedef struct SubOpts
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin(WalReceiverConn *wrconn,
List *publications, bool copydata,
- char *origin, Oid *subrel_local_oids,
- int subrel_count, char *subname);
+ bool retain_dead_tuples, char *origin,
+ Oid *subrel_local_oids, int subrel_count,
+ char *subname);
+static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_FAILOVER))
opts->failover = false;
+ if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ opts->retaindeadtuples = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (strcmp(opts->slot_name, "none") == 0)
opts->slot_name = NULL;
else
- ReplicationSlotValidateName(opts->slot_name, ERROR);
+ ReplicationSlotValidateName(opts->slot_name, false, ERROR);
}
else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
strcmp(defel->defname, "copy_data") == 0)
@@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_FAILOVER;
opts->failover = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
+ strcmp(defel->defname, "retain_dead_tuples") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
+ opts->retaindeadtuples = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
stmt->subname)));
}
+ /* Ensure that we can enable retain_dead_tuples */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
opts.slot_name = stmt->subname;
@@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
- opts.origin, NULL, 0, stmt->subname);
+ opts.retaindeadtuples, opts.origin,
+ NULL, 0, stmt->subname);
+
+ if (opts.retaindeadtuples)
+ check_pub_dead_tuple_retention(wrconn);
/*
* Set sync state based on if we were asked to do data copy or
@@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
sizeof(Oid), oid_cmp);
check_publications_origin(wrconn, sub->publications, copy_data,
- sub->origin, subrel_local_oids,
- subrel_count, sub->name);
+ sub->retaindeadtuples, sub->origin,
+ subrel_local_oids, subrel_count, sub->name);
/*
* Rels that we want to remove from subscription and drop any slots
@@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
- * Common checks for altering failover and two_phase options.
+ * Common checks for altering failover, two_phase, and retain_dead_tuples
+ * options.
*/
static void
CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel)
{
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0 ||
+ strcmp(option, "retain_dead_tuples") == 0);
+
/*
- * The checks in this function are required only for failover and
- * two_phase options.
+ * Altering the retain_dead_tuples option does not update the slot on the
+ * publisher.
*/
- Assert(strcmp(option, "failover") == 0 ||
- strcmp(option, "two_phase") == 0);
+ Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
/*
* Do not allow changing the option if the subscription is enabled. This
@@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option,
* the publisher by the existing walsender, so we could have allowed that
* even when the subscription is enabled. But we kept this restriction for
* the sake of consistency and simplicity.
+ *
+ * Additionally, do not allow changing the retain_dead_tuples option when
+ * the subscription is enabled to prevent race conditions arising from the
+ * new option value being acknowledged asynchronously by the launcher and
+ * apply workers.
+ *
+ * Without the restriction, a race condition may arise when a user
+ * disables and immediately re-enables the retain_dead_tuples option. In
+ * this case, the launcher might drop the slot upon noticing the disabled
+ * action, while the apply worker may keep maintaining
+ * oldest_nonremovable_xid without noticing the option change. During this
+ * period, a transaction ID wraparound could falsely make this ID appear
+ * as if it originates from the future w.r.t the transaction ID stored in
+ * the slot maintained by launcher.
+ *
+ * Similarly, if the user enables retain_dead_tuples concurrently with the
+ * launcher starting the worker, the apply worker may start calculating
+ * oldest_nonremovable_xid before the launcher notices the enable action.
+ * Consequently, the launcher may update slot.xmin to a newer value than
+ * that maintained by the worker. In subsequent cycles, upon integrating
+ * the worker's oldest_nonremovable_xid, the launcher might detect a
+ * retreat in the calculated xmin, necessitating additional handling.
+ *
+ * XXX To address the above race conditions, we can define
+ * oldest_nonremovable_xid as FullTransactionID and adds the check to
+ * disallow retreating the conflict slot's xmin. For now, we kept the
+ * implementation simple by disallowing change to the retain_dead_tuples,
+ * but in the future we can change this after some more analysis.
+ *
+ * Note that we could restrict only the enabling of retain_dead_tuples to
+ * avoid the race conditions described above, but we maintain the
+ * restriction for both enable and disable operations for the sake of
+ * consistency.
*/
if (sub->enabled)
ereport(ERROR,
@@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
bool update_tuple = false;
bool update_failover = false;
bool update_two_phase = false;
+ bool check_pub_rdt = false;
+ bool retain_dead_tuples;
+ char *origin;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
sub = GetSubscription(subid, false);
+ retain_dead_tuples = sub->retaindeadtuples;
+ origin = sub->origin;
+
/*
* Don't allow non-superuser modification of a subscription with
* password_required=false.
@@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subfailover - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ {
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
+ replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+
+ CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
+
+ /*
+ * Workers may continue running even after the
+ * subscription has been disabled.
+ *
+ * To prevent race conditions (as described in
+ * CheckAlterSubOption()), ensure that all worker
+ * processes have already exited before proceeding.
+ */
+ if (logicalrep_workers_find(subid, true, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
+ errhint("Try again after some time.")));
+
+ /*
+ * Remind the user that enabling subscription will prevent
+ * the accumulation of dead tuples.
+ */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
+
+ /*
+ * Notify the launcher to manage the replication slot for
+ * conflict detection. This ensures that replication slot
+ * is efficiently handled (created, updated, or dropped)
+ * in response to any configuration changes.
+ */
+ ApplyLauncherWakeupAtCommit();
+
+ check_pub_rdt = opts.retaindeadtuples;
+ retain_dead_tuples = opts.retaindeadtuples;
+ }
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
replaces[Anum_pg_subscription_suborigin - 1] = true;
+
+ /*
+ * Check if changes from different origins may be received
+ * from the publisher when the origin is changed to ANY
+ * and retain_dead_tuples is enabled.
+ */
+ check_pub_rdt = retain_dead_tuples &&
+ pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
+
+ origin = opts.origin;
}
update_tuple = true;
@@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
+ /*
+ * Check track_commit_timestamp only when enabling the
+ * subscription in case it was disabled after creation. See
+ * comments atop CheckSubDeadTupleRetention() for details.
+ */
+ if (sub->retaindeadtuples)
+ CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+ WARNING);
+
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
@@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
ApplyLauncherWakeupAtCommit();
update_tuple = true;
+
+ /*
+ * The subscription might be initially created with
+ * connect=false and retain_dead_tuples=true, meaning the
+ * remote server's status may not be checked. Ensure this
+ * check is conducted now.
+ */
+ check_pub_rdt = sub->retaindeadtuples && opts.enabled;
break;
}
@@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
CStringGetTextDatum(stmt->conninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
update_tuple = true;
+
+ /*
+ * Since the remote server configuration might have changed,
+ * perform a check to ensure it permits enabling
+ * retain_dead_tuples.
+ */
+ check_pub_rdt = sub->retaindeadtuples;
break;
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
@@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering the slot, if
- * needed.
+ * Try to acquire the connection necessary either for modifying the slot
+ * or for checking if the remote server permits enabling
+ * retain_dead_tuples.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (update_failover || update_two_phase)
+ if (update_failover || update_two_phase || check_pub_rdt)
{
bool must_use_password;
char *err;
@@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
+ /*
+ * Try to connect to the publisher, using the new connection string if
+ * available.
+ */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
- sub->name, &err);
+ wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
+ true, true, must_use_password, sub->name,
+ &err);
if (!wrconn)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname,
- update_failover ? &opts.failover : NULL,
- update_two_phase ? &opts.twophase : NULL);
+ if (retain_dead_tuples)
+ check_pub_dead_tuple_retention(wrconn);
+
+ check_publications_origin(wrconn, sub->publications, false,
+ retain_dead_tuples, origin, NULL, 0,
+ sub->name);
+
+ if (update_failover || update_two_phase)
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* Check and log a warning if the publisher has subscribed to the same table,
* its partition ancestors (if it's a partition), or its partition children (if
* it's a partitioned table), from some other publishers. This check is
- * required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * required in the following scenarios:
*
- * This check need not be performed on the tables that are already added
- * because incremental sync for those tables will happen through WAL and the
- * origin of the data can be identified from the WAL records.
+ * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "copy_data = true" and "origin = none":
+ * - Warn the user that data with an origin might have been copied.
+ * - This check is skipped for tables already added, as incremental sync via
+ * WAL allows origin tracking. The list of such tables is in
+ * subrel_local_oids.
*
- * subrel_local_oids contains the list of relation oids that are already
- * present on the subscriber.
+ * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "retain_dead_tuples = true" and "origin = any", and for ALTER
+ * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
+ * when the publisher's status changes (e.g., due to a connection string
+ * update):
+ * - Warn the user that only conflict detection info for local changes on
+ * the publisher is retained. Data from other origins may lack sufficient
+ * details for reliable conflict detection.
+ * - See comments atop worker.c for more details.
*/
static void
check_publications_origin(WalReceiverConn *wrconn, List *publications,
- bool copydata, char *origin, Oid *subrel_local_oids,
+ bool copydata, bool retain_dead_tuples,
+ char *origin, Oid *subrel_local_oids,
int subrel_count, char *subname)
{
WalRcvExecResult *res;
@@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
Oid tableRow[1] = {TEXTOID};
List *publist = NIL;
int i;
+ bool check_rdt;
+ bool check_table_sync;
+ bool origin_none = origin &&
+ pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
+
+ /*
+ * Enable retain_dead_tuples checks only when origin is set to 'any',
+ * since with origin='none' only local changes are replicated to the
+ * subscriber.
+ */
+ check_rdt = retain_dead_tuples && !origin_none;
+
+ /*
+ * Enable table synchronization checks only when origin is 'none', to
+ * ensure that data from other origins is not inadvertently copied.
+ */
+ check_table_sync = copydata && origin_none;
- if (!copydata || !origin ||
- (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+ /* retain_dead_tuples and table sync checks occur separately */
+ Assert(!(check_rdt && check_table_sync));
+
+ /* Return if no checks are required */
+ if (!check_rdt && !check_table_sync)
return;
initStringInfo(&cmd);
@@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
/*
* In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
* the list of relation oids that are already present on the subscriber.
- * This check should be skipped for these tables.
+ * This check should be skipped for these tables if checking for table
+ * sync scenario. However, when handling the retain_dead_tuples scenario,
+ * ensure all tables are checked, as some existing tables may now include
+ * changes from other origins due to newly created subscriptions on the
+ * publisher.
*/
- for (i = 0; i < subrel_count; i++)
+ if (check_table_sync)
{
- Oid relid = subrel_local_oids[i];
- char *schemaname = get_namespace_name(get_rel_namespace(relid));
- char *tablename = get_rel_name(relid);
+ for (i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
- appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
- schemaname, tablename);
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
}
res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
@@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
* XXX: For simplicity, we don't check whether the table has any data or
* not. If the table doesn't have any data then we don't need to
* distinguish between data having origin and data not having origin so we
- * can avoid logging a warning in that case.
+ * can avoid logging a warning for table sync scenario.
*/
if (publist)
{
StringInfo pubnames = makeStringInfo();
+ StringInfo err_msg = makeStringInfo();
+ StringInfo err_hint = makeStringInfo();
/* Prepare the list of publication(s) for warning message. */
GetPublicationsStr(publist, pubnames, false);
+
+ if (check_table_sync)
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+ subname);
+ appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
+ }
+ else
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
+ subname);
+ appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
+ }
+
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
- subname),
- errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
- "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
+ errmsg_internal("%s", err_msg->data),
+ errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
+ "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
list_length(publist), pubnames->data),
- errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+ errhint_internal("%s", err_hint->data));
}
ExecDropSingleTupleTableSlot(slot);
@@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
}
/*
+ * Determine whether the retain_dead_tuples can be enabled based on the
+ * publisher's status.
+ *
+ * This option is disallowed if the publisher is running a version earlier
+ * than the PG19, or if the publisher is in recovery (i.e., it is a standby
+ * server).
+ *
+ * See comments atop worker.c for a detailed explanation.
+ */
+static void
+check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
+{
+ WalRcvExecResult *res;
+ Oid RecoveryRow[1] = {BOOLOID};
+ TupleTableSlot *slot;
+ bool isnull;
+ bool remote_in_recovery;
+
+ if (walrcv_server_version(wrconn) < 19000)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
+
+ res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not obtain recovery progress from the publisher: %s",
+ res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ elog(ERROR, "failed to fetch tuple for the recovery progress");
+
+ remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
+
+ if (remote_in_recovery)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Check if the subscriber's configuration is adequate to enable the
+ * retain_dead_tuples option.
+ *
+ * Issue an ERROR if the wal_level does not support the use of replication
+ * slots when check_guc is set to true.
+ *
+ * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
+ * set to true. This is only to highlight the importance of enabling
+ * track_commit_timestamp instead of catching all the misconfigurations, as
+ * this setting can be adjusted after subscription creation. Without it, the
+ * apply worker will simply skip conflict detection.
+ *
+ * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
+ * ERROR since users can only modify retain_dead_tuples for disabled
+ * subscriptions. And as long as the subscription is enabled promptly, it will
+ * not pose issues.
+ */
+void
+CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+ int elevel_for_sub_disabled)
+{
+ Assert(elevel_for_sub_disabled == NOTICE ||
+ elevel_for_sub_disabled == WARNING);
+
+ if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+ errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+ if (check_guc && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (sub_disabled)
+ ereport(elevel_for_sub_disabled,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+ (elevel_for_sub_disabled > NOTICE)
+ ? errhint("Consider setting %s to false.",
+ "retain_dead_tuples") : 0);
+}
+
+/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
*
diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c
index 9f4d05ffbd4..4da46666439 100644
--- a/src/backend/libpq/auth.c
+++ b/src/backend/libpq/auth.c
@@ -94,8 +94,16 @@ static int auth_peer(hbaPort *port);
#define PGSQL_PAM_SERVICE "postgresql" /* Service name passed to PAM */
+/* Work around original Solaris' lack of "const" in the conv_proc signature */
+#ifdef _PAM_LEGACY_NONCONST
+#define PG_PAM_CONST
+#else
+#define PG_PAM_CONST const
+#endif
+
static int CheckPAMAuth(Port *port, const char *user, const char *password);
-static int pam_passwd_conv_proc(int num_msg, const struct pam_message **msg,
+static int pam_passwd_conv_proc(int num_msg,
+ PG_PAM_CONST struct pam_message **msg,
struct pam_response **resp, void *appdata_ptr);
static struct pam_conv pam_passw_conv = {
@@ -1917,7 +1925,7 @@ auth_peer(hbaPort *port)
*/
static int
-pam_passwd_conv_proc(int num_msg, const struct pam_message **msg,
+pam_passwd_conv_proc(int num_msg, PG_PAM_CONST struct pam_message **msg,
struct pam_response **resp, void *appdata_ptr)
{
const char *passwd;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0c75fe064d5..886d99951dd 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -233,7 +233,7 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical,
}
PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
- gettext_noop("received message via replication"));
+ "received message via replication");
/*
* Set always-secure search path for the cases where the connection is
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..1fa931a7422 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -441,7 +441,8 @@ pa_launch_parallel_worker(void)
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
- dsm_segment_handle(winfo->dsm_seg));
+ dsm_segment_handle(winfo->dsm_seg),
+ false);
if (launched)
{
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..742d9ba68e9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -32,6 +32,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
-static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static bool acquire_conflict_slot_if_exists(void);
+static void advance_conflict_slot_xmin(TransactionId new_xmin);
/*
@@ -148,6 +151,7 @@ get_subscription_list(void)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid, dsm_handle subworker_dsm)
+ Oid relid, dsm_handle subworker_dsm,
+ bool retain_dead_tuples)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
+ * - The replication slot used in conflict detection is created when
+ * retain_dead_tuples is enabled
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert(!retain_dead_tuples || MyReplicationSlot);
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -454,6 +462,9 @@ retry:
worker->stream_fileset = NULL;
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
+ worker->oldest_nonremovable_xid = retain_dead_tuples
+ ? MyReplicationSlot->data.xmin
+ : InvalidTransactionId;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1118,7 +1129,10 @@ ApplyLauncherWakeupAtCommit(void)
on_commit_launcher_wakeup = true;
}
-static void
+/*
+ * Wakeup the launcher immediately.
+ */
+void
ApplyLauncherWakeup(void)
{
if (LogicalRepCtx->launcher_pid != 0)
@@ -1150,6 +1164,12 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+ /*
+ * Acquire the conflict detection slot at startup to ensure it can be
+ * dropped if no longer needed after a restart.
+ */
+ acquire_conflict_slot_if_exists();
+
/* Enter main loop */
for (;;)
{
@@ -1159,6 +1179,9 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ bool can_advance_xmin = true;
+ bool retain_dead_tuples = false;
+ TransactionId xmin = InvalidTransactionId;
CHECK_FOR_INTERRUPTS();
@@ -1168,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
- /* Start any missing workers for enabled subscriptions. */
+ /*
+ * Start any missing workers for enabled subscriptions.
+ *
+ * Also, during the iteration through all subscriptions, we compute
+ * the minimum XID required to protect deleted tuples for conflict
+ * detection if one of the subscription enables retain_dead_tuples
+ * option.
+ */
sublist = get_subscription_list();
foreach(lc, sublist)
{
@@ -1178,6 +1208,38 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long elapsed;
+ if (sub->retaindeadtuples)
+ {
+ retain_dead_tuples = true;
+
+ /*
+ * Can't advance xmin of the slot unless all the subscriptions
+ * with retain_dead_tuples are enabled. This is required to
+ * ensure that we don't advance the xmin of
+ * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
+ * enabled. Otherwise, we won't be able to detect conflicts
+ * reliably for such a subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_advance_xmin &= sub->enabled;
+
+ /*
+ * Create a replication slot to retain information necessary
+ * for conflict detection such as dead tuples, commit
+ * timestamps, and origins.
+ *
+ * The slot is created before starting the apply worker to
+ * prevent it from unnecessarily maintaining its
+ * oldest_nonremovable_xid.
+ *
+ * The slot is created even for a disabled subscription to
+ * ensure that conflict-related information is available when
+ * applying remote changes that occurred before the
+ * subscription was enabled.
+ */
+ CreateConflictDetectionSlot();
+ }
+
if (!sub->enabled)
continue;
@@ -1186,7 +1248,27 @@ ApplyLauncherMain(Datum main_arg)
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
- continue; /* worker is running already */
+ {
+ /*
+ * Compute the minimum xmin required to protect dead tuples
+ * required for conflict detection among all running apply
+ * workers that enables retain_dead_tuples.
+ */
+ if (sub->retaindeadtuples && can_advance_xmin)
+ compute_min_nonremovable_xid(w, &xmin);
+
+ /* worker is running already */
+ continue;
+ }
+
+ /*
+ * Can't advance xmin of the slot unless all the workers
+ * corresponding to subscriptions with retain_dead_tuples are
+ * running, disabling the further computation of the minimum
+ * nonremovable xid.
+ */
+ if (sub->retaindeadtuples)
+ can_advance_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1210,7 +1292,8 @@ ApplyLauncherMain(Datum main_arg)
if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
- DSM_HANDLE_INVALID))
+ DSM_HANDLE_INVALID,
+ sub->retaindeadtuples))
{
/*
* We get here either if we failed to launch a worker
@@ -1230,6 +1313,20 @@ ApplyLauncherMain(Datum main_arg)
}
}
+ /*
+ * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
+ * that requires us to retain dead tuples. Otherwise, if required,
+ * advance the slot's xmin to protect dead tuples required for the
+ * conflict detection.
+ */
+ if (MyReplicationSlot)
+ {
+ if (!retain_dead_tuples)
+ ReplicationSlotDropAcquired();
+ else if (can_advance_xmin)
+ advance_conflict_slot_xmin(xmin);
+ }
+
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
@@ -1258,6 +1355,125 @@ ApplyLauncherMain(Datum main_arg)
}
/*
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_dead_tuples enabled. Store the result
+ * in *xmin.
+ */
+static void
+compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+{
+ TransactionId nonremovable_xid;
+
+ Assert(worker != NULL);
+
+ /*
+ * The replication slot for conflict detection must be created before the
+ * worker starts.
+ */
+ Assert(MyReplicationSlot);
+
+ SpinLockAcquire(&worker->relmutex);
+ nonremovable_xid = worker->oldest_nonremovable_xid;
+ SpinLockRelease(&worker->relmutex);
+
+ Assert(TransactionIdIsValid(nonremovable_xid));
+
+ if (!TransactionIdIsValid(*xmin) ||
+ TransactionIdPrecedes(nonremovable_xid, *xmin))
+ *xmin = nonremovable_xid;
+}
+
+/*
+ * Acquire the replication slot used to retain information for conflict
+ * detection, if it exists.
+ *
+ * Return true if successfully acquired, otherwise return false.
+ */
+static bool
+acquire_conflict_slot_if_exists(void)
+{
+ if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
+ return false;
+
+ ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
+ return true;
+}
+
+/*
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
+ */
+static void
+advance_conflict_slot_xmin(TransactionId new_xmin)
+{
+ Assert(MyReplicationSlot);
+ Assert(TransactionIdIsValid(new_xmin));
+ Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+
+ /* Return if the xmin value of the slot cannot be advanced */
+ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
+ return;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = new_xmin;
+ MyReplicationSlot->data.xmin = new_xmin;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+
+ /*
+ * Like PhysicalConfirmReceivedLocation(), do not save slot information
+ * each time. This is acceptable because all concurrent transactions on
+ * the publisher that require the data preceding the slot's xmin should
+ * have already been applied and flushed on the subscriber before the xmin
+ * is advanced. So, even if the slot's xmin regresses after a restart, it
+ * will be advanced again in the next cycle. Therefore, no data required
+ * for conflict detection will be prematurely removed.
+ */
+ return;
+}
+
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ TransactionId xmin_horizon;
+
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ xmin_horizon = GetOldestSafeDecodingTransactionId(false);
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = xmin_horizon;
+ MyReplicationSlot->data.xmin = xmin_horizon;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotsComputeRequiredXmin(true);
+
+ LWLockRelease(ProcArrayLock);
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7b4e8629553..5febd154b6b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4917,7 +4917,7 @@ StartupReorderBuffer(void)
continue;
/* if it cannot be a slot, skip the directory */
- if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+ if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
continue;
/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e4fd6347fd1..3fea0a0206e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -615,7 +615,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid,
- DSM_HANDLE_INVALID);
+ DSM_HANDLE_INVALID,
+ false);
}
}
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c5fb627aa56..b59221c4d06 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -132,6 +132,96 @@
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
+ *
+ * RETAIN DEAD TUPLES
+ * ----------------------
+ * Each apply worker that enabled retain_dead_tuples option maintains a
+ * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
+ * prevent dead rows from being removed prematurely when the apply worker still
+ * needs them to detect conflicts reliably. This helps to retain the required
+ * commit_ts module information, which further helps to detect
+ * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * otherwise, vacuum freeze could remove the required information.
+ *
+ * The logical replication launcher manages an internal replication slot named
+ * "pg_conflict_detection". It asynchronously aggregates the non-removable
+ * transaction ID from all apply workers to determine the appropriate xmin for
+ * the slot, thereby retaining necessary tuples.
+ *
+ * The non-removable transaction ID in the apply worker is advanced to the
+ * oldest running transaction ID once all concurrent transactions on the
+ * publisher have been applied and flushed locally. The process involves:
+ *
+ * - RDT_GET_CANDIDATE_XID:
+ * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
+ * candidate xid.
+ *
+ * - RDT_REQUEST_PUBLISHER_STATUS:
+ * Send a message to the walsender requesting the publisher status, which
+ * includes the latest WAL write position and information about transactions
+ * that are in the commit phase.
+ *
+ * - RDT_WAIT_FOR_PUBLISHER_STATUS:
+ * Wait for the status from the walsender. After receiving the first status,
+ * do not proceed if there are concurrent remote transactions that are still
+ * in the commit phase. These transactions might have been assigned an
+ * earlier commit timestamp but have not yet written the commit WAL record.
+ * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
+ * until all these transactions have completed.
+ *
+ * - RDT_WAIT_FOR_LOCAL_FLUSH:
+ * Advance the non-removable transaction ID if the current flush location has
+ * reached or surpassed the last received WAL position.
+ *
+ * The overall state progression is: GET_CANDIDATE_XID ->
+ * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
+ * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
+ *
+ * Retaining the dead tuples for this period is sufficient for ensuring
+ * eventual consistency using last-update-wins strategy, as dead tuples are
+ * useful for detecting conflicts only during the application of concurrent
+ * transactions from remote nodes. After applying and flushing all remote
+ * transactions that occurred concurrently with the tuple DELETE, any
+ * subsequent UPDATE from a remote node should have a later timestamp. In such
+ * cases, it is acceptable to detect an update_missing scenario and convert the
+ * UPDATE to an INSERT when applying it. But, detecting concurrent remote
+ * transactions with earlier timestamps than the DELETE is necessary, as the
+ * UPDATEs in remote transactions should be ignored if their timestamp is
+ * earlier than that of the dead tuples.
+ *
+ * Note that advancing the non-removable transaction ID is not supported if the
+ * publisher is also a physical standby. This is because the logical walsender
+ * on the standby can only get the WAL replay position but there may be more
+ * WALs that are being replicated from the primary and those WALs could have
+ * earlier commit timestamp.
+ *
+ * Similarly, when the publisher has subscribed to another publisher,
+ * information necessary for conflict detection cannot be retained for
+ * changes from origins other than the publisher. This is because publisher
+ * lacks the information on concurrent transactions of other publishers to
+ * which it subscribes. As the information on concurrent transactions is
+ * unavailable beyond subscriber's immediate publishers, the non-removable
+ * transaction ID might be advanced prematurely before changes from other
+ * origins have been fully applied.
+ *
+ * XXX Retaining information for changes from other origins might be possible
+ * by requesting the subscription on that origin to enable retain_dead_tuples
+ * and fetching the conflict detection slot.xmin along with the publisher's
+ * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
+ * wait for the remote slot's xmin to reach the oldest active transaction ID,
+ * ensuring that all transactions from other origins have been applied on the
+ * publisher, thereby getting the latest WAL position that includes all
+ * concurrent changes. However, this approach may impact performance, so it
+ * might not worth the effort.
+ *
+ * XXX It seems feasible to get the latest commit's WAL location from the
+ * publisher and wait till that is applied. However, we can't do that
+ * because commit timestamps can regress as a commit with a later LSN is not
+ * guaranteed to have a later timestamp than those with earlier LSNs. Having
+ * said that, even if that is possible, it won't improve performance much as
+ * the apply always lag and moves slowly as compared with the transactions
+ * on the publisher.
*-------------------------------------------------------------------------
*/
@@ -140,6 +230,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
@@ -148,6 +239,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -166,12 +258,14 @@
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
@@ -268,6 +362,78 @@ typedef enum
TRANS_PARALLEL_APPLY,
} TransApplyAction;
+/*
+ * The phases involved in advancing the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details of the transition between these
+ * phases.
+ */
+typedef enum
+{
+ RDT_GET_CANDIDATE_XID,
+ RDT_REQUEST_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_LOCAL_FLUSH
+} RetainDeadTuplesPhase;
+
+/*
+ * Critical information for managing phase transitions within the
+ * RetainDeadTuplesPhase.
+ */
+typedef struct RetainDeadTuplesData
+{
+ RetainDeadTuplesPhase phase; /* current phase */
+ XLogRecPtr remote_lsn; /* WAL write position on the publisher */
+
+ /*
+ * Oldest transaction ID that was in the commit phase on the publisher.
+ * Use FullTransactionId to prevent issues with transaction ID wraparound,
+ * where a new remote_oldestxid could falsely appear to originate from the
+ * past and block advancement.
+ */
+ FullTransactionId remote_oldestxid;
+
+ /*
+ * Next transaction ID to be assigned on the publisher. Use
+ * FullTransactionId for consistency and to allow straightforward
+ * comparisons with remote_oldestxid.
+ */
+ FullTransactionId remote_nextxid;
+
+ TimestampTz reply_time; /* when the publisher responds with status */
+
+ /*
+ * Publisher transaction ID that must be awaited to complete before
+ * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
+ * FullTransactionId for the same reason as remote_nextxid.
+ */
+ FullTransactionId remote_wait_for;
+
+ TransactionId candidate_xid; /* candidate for the non-removable
+ * transaction ID */
+ TimestampTz flushpos_update_time; /* when the remote flush position was
+ * updated in final phase
+ * (RDT_WAIT_FOR_LOCAL_FLUSH) */
+
+ /*
+ * The following fields are used to determine the timing for the next
+ * round of transaction ID advancement.
+ */
+ TimestampTz last_recv_time; /* when the last message was received */
+ TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
+ int xid_advance_interval; /* how much time (ms) to wait before
+ * attempting to advance the
+ * non-removable transaction ID */
+} RetainDeadTuplesData;
+
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs. The maximum interval is a bit arbitrary but
+ * is sufficient to not cause any undue network traffic.
+ */
+#define MIN_XID_ADVANCE_INTERVAL 100
+#define MAX_XID_ADVANCE_INTERVAL 180000
+
/* errcontext tracker */
static ApplyErrorCallbackArg apply_error_callback_arg =
{
@@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
+/*
+ * The remote WAL position that has been applied and flushed locally. We record
+ * and use this information both while sending feedback to the server and
+ * advancing oldest_nonremovable_xid.
+ */
+static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -372,6 +545,19 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
+static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
+static void request_publisher_status(RetainDeadTuplesData *rdt_data);
+static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
+ bool new_xid_found);
+
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ RetainDeadTuplesData rdt_data = {0};
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
+ rdt_data.last_recv_time = last_recv_timestamp;
+
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
{
@@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
UpdateWorkerStats(last_received, timestamp, true);
}
+ else if (c == 's') /* Primary status update */
+ {
+ rdt_data.remote_lsn = pq_getmsgint64(&s);
+ rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.reply_time = pq_getmsgint64(&s);
+
+ /*
+ * This should never happen, see
+ * ProcessStandbyPSRequestMessage. But if it happens
+ * due to a bug, we don't want to proceed as it can
+ * incorrectly advance oldest_nonremovable_xid.
+ */
+ if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+ elog(ERROR, "cannot get the latest WAL position from the publisher");
+
+ maybe_advance_nonremovable_xid(&rdt_data, true);
+
+ UpdateWorkerStats(last_received, rdt_data.reply_time, false);
+ }
/* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
@@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
+ /* Reset the timestamp if no message was received */
+ rdt_data.last_recv_time = 0;
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
if (!in_remote_transaction && !in_streamed_transaction)
{
/*
@@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
else
wait_time = NAPTIME_PER_CYCLE;
+ /*
+ * Ensure to wake up when it's possible to advance the non-removable
+ * transaction ID.
+ */
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
@@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
- static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
@@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
+ * Attempt to advance the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details.
+ */
+static void
+maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ if (!can_advance_nonremovable_xid(rdt_data))
+ return;
+
+ process_rdt_phase_transition(rdt_data, status_received);
+}
+
+/*
+ * Preliminary check to determine if advancing the non-removable transaction ID
+ * is allowed.
+ */
+static bool
+can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * It is sufficient to manage non-removable transaction ID for a
+ * subscription by the main apply worker to detect conflicts reliably even
+ * for table sync or parallel apply workers.
+ */
+ if (!am_leader_apply_worker())
+ return false;
+
+ /* No need to advance if retaining dead tuples is not required */
+ if (!MySubscription->retaindeadtuples)
+ return false;
+
+ return true;
+}
+
+/*
+ * Process phase transitions during the non-removable transaction ID
+ * advancement. See comments atop worker.c for details of the transition.
+ */
+static void
+process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ switch (rdt_data->phase)
+ {
+ case RDT_GET_CANDIDATE_XID:
+ get_candidate_xid(rdt_data);
+ break;
+ case RDT_REQUEST_PUBLISHER_STATUS:
+ request_publisher_status(rdt_data);
+ break;
+ case RDT_WAIT_FOR_PUBLISHER_STATUS:
+ wait_for_publisher_status(rdt_data, status_received);
+ break;
+ case RDT_WAIT_FOR_LOCAL_FLUSH:
+ wait_for_local_flush(rdt_data);
+ break;
+ }
+}
+
+/*
+ * Workhorse for the RDT_GET_CANDIDATE_XID phase.
+ */
+static void
+get_candidate_xid(RetainDeadTuplesData *rdt_data)
+{
+ TransactionId oldest_running_xid;
+ TimestampTz now;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Compute the candidate_xid and request the publisher status at most once
+ * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
+ * details on how this value is dynamically adjusted. This is to avoid
+ * using CPU and network resources without making much progress.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ rdt_data->xid_advance_interval))
+ return;
+
+ /*
+ * Immediately update the timer, even if the function returns later
+ * without setting candidate_xid due to inactivity on the subscriber. This
+ * avoids frequent calls to GetOldestActiveTransactionId.
+ */
+ rdt_data->candidate_xid_time = now;
+
+ /*
+ * Consider transactions in the current database, as only dead tuples from
+ * this database are required for conflict detection.
+ */
+ oldest_running_xid = GetOldestActiveTransactionId(false, false);
+
+ /*
+ * Oldest active transaction ID (oldest_running_xid) can't be behind any
+ * of its previously computed value.
+ */
+ Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid));
+
+ /* Return if the oldest_nonremovable_xid cannot be advanced */
+ if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid))
+ {
+ adjust_xid_advance_interval(rdt_data, false);
+ return;
+ }
+
+ adjust_xid_advance_interval(rdt_data, true);
+
+ rdt_data->candidate_xid = oldest_running_xid;
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
+ */
+static void
+request_publisher_status(RetainDeadTuplesData *rdt_data)
+{
+ static StringInfo request_message = NULL;
+
+ if (!request_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ request_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(request_message);
+
+ /*
+ * Send the current time to update the remote walsender's latest reply
+ * message received time.
+ */
+ pq_sendbyte(request_message, 'p');
+ pq_sendint64(request_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");
+
+ /* Send a request for the publisher status */
+ walrcv_send(LogRepWorkerWalRcvConn,
+ request_message->data, request_message->len);
+
+ rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
+
+ /*
+ * Skip calling maybe_advance_nonremovable_xid() since further transition
+ * is possible only once we receive the publisher status message.
+ */
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
+ */
+static void
+wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ /*
+ * Return if we have requested but not yet received the publisher status.
+ */
+ if (!status_received)
+ return;
+
+ if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
+ rdt_data->remote_wait_for = rdt_data->remote_nextxid;
+
+ /*
+ * Check if all remote concurrent transactions that were active at the
+ * first status request have now completed. If completed, proceed to the
+ * next phase; otherwise, continue checking the publisher status until
+ * these transactions finish.
+ *
+ * It's possible that transactions in the commit phase during the last
+ * cycle have now finished committing, but remote_oldestxid remains older
+ * than remote_wait_for. This can happen if some old transaction came in
+ * the commit phase when we requested status in this cycle. We do not
+ * handle this case explicitly as it's rare and the benefit doesn't
+ * justify the required complexity. Tracking would require either caching
+ * all xids at the publisher or sending them to subscribers. The condition
+ * will resolve naturally once the remaining transactions are finished.
+ *
+ * Directly advancing the non-removable transaction ID is possible if
+ * there are no activities on the publisher since the last advancement
+ * cycle. However, it requires maintaining two fields, last_remote_nextxid
+ * and last_remote_lsn, within the structure for comparison with the
+ * current cycle's values. Considering the minimal cost of continuing in
+ * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
+ * advance the transaction ID here.
+ */
+ if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
+ rdt_data->remote_oldestxid))
+ rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
+ else
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainDeadTuplesData *rdt_data)
+{
+ Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+ TransactionIdIsValid(rdt_data->candidate_xid));
+
+ /*
+ * We expect the publisher and subscriber clocks to be in sync using time
+ * sync service like NTP. Otherwise, we will advance this worker's
+ * oldest_nonremovable_xid prematurely, leading to the removal of rows
+ * required to detect conflicts reliably. This check primarily addresses
+ * scenarios where the publisher's clock falls behind; if the publisher's
+ * clock is ahead, subsequent transactions will naturally bear later
+ * commit timestamps, conforming to the design outlined atop worker.c.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(rdt_data->reply_time,
+ rdt_data->candidate_xid_time, 0))
+ ereport(ERROR,
+ errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
+ errdetail_internal("The clock on the publisher is behind that of the subscriber."));
+
+ /*
+ * Do not attempt to advance the non-removable transaction ID when table
+ * sync is in progress. During this time, changes from a single
+ * transaction may be applied by multiple table sync workers corresponding
+ * to the target tables. So, it's necessary for all table sync workers to
+ * apply and flush the corresponding changes before advancing the
+ * transaction ID, otherwise, dead tuples that are still needed for
+ * conflict detection in table sync workers could be removed prematurely.
+ * However, confirming the apply and flush progress across all table sync
+ * workers is complex and not worth the effort, so we simply return if not
+ * all tables are in the READY state.
+ *
+ * It is safe to add new tables with initial states to the subscription
+ * after this check because any changes applied to these tables should
+ * have a WAL position greater than the rdt_data->remote_lsn.
+ */
+ if (!AllTablesyncsReady())
+ return;
+
+ /*
+ * Update and check the remote flush position if we are applying changes
+ * in a loop. This is done at most once per WalWriterDelay to avoid
+ * performing costly operations in get_flush_position() too frequently
+ * during change application.
+ */
+ if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
+ TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
+ rdt_data->last_recv_time, WalWriterDelay))
+ {
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ bool have_pending_txes;
+
+ /* Fetch the latest remote flush position */
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+
+ rdt_data->flushpos_update_time = rdt_data->last_recv_time;
+ }
+
+ /* Return to wait for the changes to be applied */
+ if (last_flushpos < rdt_data->remote_lsn)
+ return;
+
+ /*
+ * Reaching here means the remote WAL position has been received, and all
+ * transactions up to that position on the publisher have been applied and
+ * flushed locally. So, we can advance the non-removable transaction ID.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
+ LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+ rdt_data->candidate_xid);
+
+ /* Notify launcher to update the xmin of the conflict slot */
+ ApplyLauncherWakeup();
+
+ /*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement. We can even use
+ * flushpos_update_time in the next round to decide whether to get the
+ * latest flush position.
+ */
+ rdt_data->phase = RDT_GET_CANDIDATE_XID;
+ rdt_data->remote_lsn = InvalidXLogRecPtr;
+ rdt_data->remote_oldestxid = InvalidFullTransactionId;
+ rdt_data->remote_nextxid = InvalidFullTransactionId;
+ rdt_data->reply_time = 0;
+ rdt_data->remote_wait_for = InvalidFullTransactionId;
+ rdt_data->candidate_xid = InvalidTransactionId;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Adjust the interval for advancing non-removable transaction IDs.
+ *
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
+ * consider the other interval or a separate GUC if the need arises.
+ */
+static void
+adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
+{
+ if (!new_xid_found && rdt_data->xid_advance_interval)
+ {
+ int max_interval = wal_receiver_status_interval
+ ? wal_receiver_status_interval * 1000
+ : MAX_XID_ADVANCE_INTERVAL;
+
+ /*
+ * No new transaction ID has been assigned since the last check, so
+ * double the interval, but not beyond the maximum allowable value.
+ */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+ max_interval);
+ }
+ else
+ {
+ /*
+ * A new transaction ID was found or the interval is not yet
+ * initialized, so set the interval to the minimum value.
+ */
+ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
+ }
+}
+
+/*
* Exit routine for apply workers due to subscription parameter changes.
*/
static void
@@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void)
apply_worker_exit();
}
+ /*
+ * Restart the worker if retain_dead_tuples was enabled during startup.
+ *
+ * At this point, the replication slot used for conflict detection might
+ * not exist yet, or could be dropped soon if the launcher perceives
+ * retain_dead_tuples as disabled. To avoid unnecessary tracking of
+ * oldest_nonremovable_xid when the slot is absent or at risk of being
+ * dropped, a restart is initiated.
+ *
+ * The oldest_nonremovable_xid should be initialized only when the
+ * retain_dead_tuples is enabled before launching the worker. See
+ * logicalrep_worker_launch.
+ */
+ if (am_leader_apply_worker() &&
+ MySubscription->retaindeadtuples &&
+ !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+ {
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
+ MySubscription->name, "retain_dead_tuples"));
+
+ apply_worker_exit();
+ }
+
/* Setup synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Skip the track_commit_timestamp check when disabling the worker due to
+ * an error, as verifying commit timestamps is unnecessary in this
+ * context.
+ */
+ if (MySubscription->retaindeadtuples)
+ CheckSubDeadTupleRetention(false, true, WARNING);
+
proc_exit(0);
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e44ad576bc7..8605776ad86 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
@@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
static void ReplicationSlotShmemExit(int code, Datum arg);
+static bool IsSlotForConflictCheck(const char *name);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
@@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg)
/*
* Check whether the passed slot name is valid and report errors at elevel.
*
+ * An error will be reported for a reserved replication slot name if
+ * allow_reserved_name is set to false.
+ *
* Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
* the name to be used as a directory name on every supported OS.
*
* Returns whether the directory name is valid or not if elevel < ERROR.
*/
bool
-ReplicationSlotValidateName(const char *name, int elevel)
+ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+ int elevel)
{
const char *cp;
@@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel)
return false;
}
}
+
+ if (!allow_reserved_name && IsSlotForConflictCheck(name))
+ {
+ ereport(elevel,
+ errcode(ERRCODE_RESERVED_NAME),
+ errmsg("replication slot name \"%s\" is reserved",
+ name),
+ errdetail("The name \"%s\" is reserved for the conflict detection slot.",
+ CONFLICT_DETECTION_SLOT));
+
+ return false;
+ }
+
return true;
}
/*
+ * Return true if the replication slot name is "pg_conflict_detection".
+ */
+static bool
+IsSlotForConflictCheck(const char *name)
+{
+ return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
+}
+
+/*
* Create a new replication slot and mark it as used by this backend.
*
* name: Name of the slot
@@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
Assert(MyReplicationSlot == NULL);
- ReplicationSlotValidateName(name, ERROR);
+ /*
+ * The logical launcher or pg_upgrade may create or migrate an internal
+ * slot, so using a reserved name is allowed in these cases.
+ */
+ ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
+ ERROR);
if (failover)
{
@@ -582,6 +615,17 @@ retry:
}
/*
+ * Do not allow users to acquire the reserved slot. This scenario may
+ * occur if the launcher that owns the slot has terminated unexpectedly
+ * due to an error, and a backend process attempts to reuse the slot.
+ */
+ if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("cannot acquire replication slot \"%s\"", name),
+ errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
+
+ /*
* This is the slot we want; check if it's active under some other
* process. In single user mode, we don't need this check.
*/
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28b8591efa5..ee911394a23 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -65,6 +65,7 @@
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
+#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
@@ -84,6 +85,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
break;
- case 'c': /* CopyDone */
- case 'f': /* CopyFail */
- case 'H': /* Flush */
- case 'S': /* Sync */
+ case PqMsg_CopyDone:
+ case PqMsg_CopyFail:
+ case PqMsg_Flush:
+ case PqMsg_Sync:
maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
break;
default:
@@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset,
/* Process the message */
switch (mtype)
{
- case 'd': /* CopyData */
+ case PqMsg_CopyData:
AppendIncrementalManifestData(ib, buf->data, buf->len);
return true;
- case 'c': /* CopyDone */
+ case PqMsg_CopyDone:
return false;
- case 'H': /* Sync */
- case 'S': /* Flush */
+ case PqMsg_Sync:
+ case PqMsg_Flush:
/* Ignore these while in CopyOut mode as we do elsewhere. */
return true;
- case 'f':
+ case PqMsg_CopyFail:
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
@@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
tmpbuf.data, sizeof(int64));
/* output previously gathered data in a CopyData packet */
- pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
+ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
CHECK_FOR_INTERRUPTS();
@@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void)
case PqMsg_CopyDone:
if (!streamingDoneSending)
{
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
}
@@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
+ case 'p':
+ ProcessStandbyPSRequestMessage();
+ break;
+
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ TransactionId oldestXidInCommit;
+ FullTransactionId nextFullXid;
+ FullTransactionId fullOldestXidInCommit;
+ WalSnd *walsnd = MyWalSnd;
+ TimestampTz replyTime;
+
+ /*
+ * This shouldn't happen because we don't support getting primary status
+ * message from standby.
+ */
+ if (RecoveryInProgress())
+ elog(ERROR, "the primary status is unavailable during recovery");
+
+ replyTime = pq_getmsgint64(&reply_message);
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * Consider transactions in the current database, as only these are the
+ * ones replicated.
+ */
+ oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ nextFullXid = ReadNextFullTransactionId();
+ fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+ oldestXidInCommit);
+ lsn = GetXLogWriteRecPtr();
+
+ elog(DEBUG2, "sending primary status");
+
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 's');
+ pq_sendint64(&output_message, lsn);
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+ pq_sendint64(&output_message, GetCurrentTimestamp());
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
+}
+
+/*
* Compute how long send/receive loops should sleep.
*
* If wal_sender_timeout is enabled we want to wake up in time to send
@@ -3246,7 +3307,7 @@ XLogSendPhysical(void)
wal_segment_close(xlogreader);
/* Send CopyDone */
- pq_putmessage_noblock('c', NULL, 0);
+ pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0);
streamingDoneSending = true;
WalSndCaughtUp = true;
@@ -3374,7 +3435,7 @@ retry:
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
sentPtr = endptr;
@@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */
- pq_putmessage_noblock('d', output_message.data, output_message.len);
+ pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len);
/* Set local flag */
if (requestReply)
diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md
index f10b5c7e31e..72ae3b3737d 100644
--- a/src/backend/storage/aio/README.md
+++ b/src/backend/storage/aio/README.md
@@ -94,7 +94,7 @@ pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV, 0);
*
* In this example we're reading only a single buffer, hence the 1.
*/
-pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1);
+pgaio_io_set_handle_data_32(ioh, (uint32 *) &buffer, 1);
/*
* Pass the AIO handle to lower-level function. When operating on the level of
@@ -119,8 +119,9 @@ pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1);
* e.g. due to reaching a limit on the number of unsubmitted IOs, and even
* complete before smgrstartreadv() returns.
*/
+void *page = BufferGetBlock(buffer);
smgrstartreadv(ioh, operation->smgr, forknum, blkno,
- BufferGetBlock(buffer), 1);
+ &page, 1);
/*
* To benefit from AIO, it is beneficial to perform other work, including
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 2418967def6..bf987aed8d3 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2814,8 +2814,10 @@ GetRunningTransactionData(void)
*
* Similar to GetSnapshotData but returns just oldestActiveXid. We include
* all PGPROCs with an assigned TransactionId, even VACUUM processes.
- * We look at all databases, though there is no need to include WALSender
- * since this has no effect on hot standby conflicts.
+ *
+ * If allDbs is true, we look at all databases, though there is no need to
+ * include WALSender since this has no effect on hot standby conflicts. If
+ * allDbs is false, skip processes attached to other databases.
*
* This is never executed during recovery so there is no need to look at
* KnownAssignedXids.
@@ -2823,9 +2825,12 @@ GetRunningTransactionData(void)
* We don't worry about updating other counters, we want to keep this as
* simple as possible and leave GetSnapshotData() as the primary code for
* that bookkeeping.
+ *
+ * inCommitOnly indicates getting the oldestActiveXid among the transactions
+ * in the commit critical section.
*/
TransactionId
-GetOldestActiveTransactionId(void)
+GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
{
ProcArrayStruct *arrayP = procArray;
TransactionId *other_xids = ProcGlobal->xids;
@@ -2852,6 +2857,8 @@ GetOldestActiveTransactionId(void)
for (index = 0; index < arrayP->numProcs; index++)
{
TransactionId xid;
+ int pgprocno = arrayP->pgprocnos[index];
+ PGPROC *proc = &allProcs[pgprocno];
/* Fetch xid just once - see GetNewTransactionId */
xid = UINT32_ACCESS_ONCE(other_xids[index]);
@@ -2859,6 +2866,13 @@ GetOldestActiveTransactionId(void)
if (!TransactionIdIsNormal(xid))
continue;
+ if (inCommitOnly &&
+ (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
+ continue;
+
+ if (!allDbs && proc->databaseId != MyDatabaseId)
+ continue;
+
if (TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid;
diff --git a/src/backend/storage/lmgr/generate-lwlocknames.pl b/src/backend/storage/lmgr/generate-lwlocknames.pl
index c7a6720440d..cd3e43c448a 100644
--- a/src/backend/storage/lmgr/generate-lwlocknames.pl
+++ b/src/backend/storage/lmgr/generate-lwlocknames.pl
@@ -27,18 +27,24 @@ print $h "/* there is deliberately not an #ifndef LWLOCKNAMES_H here */\n\n";
#
-# First, record the predefined LWLocks listed in wait_event_names.txt. We'll
-# cross-check those with the ones in lwlocklist.h.
+# First, record the predefined LWLocks and built-in tranches listed in
+# wait_event_names.txt. We'll cross-check those with the ones in lwlocklist.h.
#
+my @wait_event_tranches;
my @wait_event_lwlocks;
my $record_lwlocks = 0;
+my $in_tranches = 0;
while (<$wait_event_names>)
{
chomp;
# Check for end marker.
- last if /^# END OF PREDEFINED LWLOCKS/;
+ if (/^# END OF PREDEFINED LWLOCKS/)
+ {
+ $in_tranches = 1;
+ next;
+ }
# Skip comments and empty lines.
next if /^#/;
@@ -54,13 +60,29 @@ while (<$wait_event_names>)
# Go to the next line if we are not yet recording LWLocks.
next if not $record_lwlocks;
+ # Stop recording if we reach another section.
+ last if /^Section:/;
+
# Record the LWLock.
(my $waiteventname, my $waitevendocsentence) = split(/\t/, $_);
- push(@wait_event_lwlocks, $waiteventname);
+
+ if ($in_tranches)
+ {
+ push(@wait_event_tranches, $waiteventname);
+ }
+ else
+ {
+ push(@wait_event_lwlocks, $waiteventname);
+ }
}
+#
+# While gathering the list of predefined LWLocks, cross-check the lists in
+# lwlocklist.h with the wait events we just recorded.
+#
my $in_comment = 0;
-my $i = 0;
+my $lwlock_count = 0;
+my $tranche_count = 0;
while (<$lwlocklist>)
{
chomp;
@@ -81,38 +103,72 @@ while (<$lwlocklist>)
next;
}
- die "unable to parse lwlocklist.h line \"$_\""
- unless /^PG_LWLOCK\((\d+),\s+(\w+)\)$/;
+ #
+ # Gather list of predefined LWLocks and cross-check with the wait events.
+ #
+ if (/^PG_LWLOCK\((\d+),\s+(\w+)\)$/)
+ {
+ my ($lockidx, $lockname) = ($1, $2);
- (my $lockidx, my $lockname) = ($1, $2);
+ die "lwlocklist.h not in order" if $lockidx < $lastlockidx;
+ die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx;
- die "lwlocklist.h not in order" if $lockidx < $lastlockidx;
- die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx;
+ die "$lockname defined in lwlocklist.h but missing from "
+ . "wait_event_names.txt"
+ if $lwlock_count >= scalar @wait_event_lwlocks;
+ die "lists of predefined LWLocks do not match (first mismatch at "
+ . "$wait_event_lwlocks[$lwlock_count] in wait_event_names.txt and "
+ . "$lockname in lwlocklist.h)"
+ if $wait_event_lwlocks[$lwlock_count] ne $lockname;
- die "$lockname defined in lwlocklist.h but missing from "
- . "wait_event_names.txt"
- if $i >= scalar @wait_event_lwlocks;
- die "lists of predefined LWLocks do not match (first mismatch at "
- . "$wait_event_lwlocks[$i] in wait_event_names.txt and $lockname in "
- . "lwlocklist.h)"
- if $wait_event_lwlocks[$i] ne $lockname;
- $i++;
+ $lwlock_count++;
- while ($lastlockidx < $lockidx - 1)
+ while ($lastlockidx < $lockidx - 1)
+ {
+ ++$lastlockidx;
+ }
+ $lastlockidx = $lockidx;
+
+ # Add a "Lock" suffix to each lock name, as the C code depends on that.
+ printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n",
+ $lockname . "Lock";
+
+ next;
+ }
+
+ #
+ # Cross-check the built-in LWLock tranches with the wait events.
+ #
+ if (/^PG_LWLOCKTRANCHE\((\w+),\s+(\w+)\)$/)
{
- ++$lastlockidx;
+ my ($tranche_id, $tranche_name) = ($1, $2);
+
+ die "$tranche_name defined in lwlocklist.h but missing from "
+ . "wait_event_names.txt"
+ if $tranche_count >= scalar @wait_event_tranches;
+ die
+ "lists of built-in LWLock tranches do not match (first mismatch at "
+ . "$wait_event_tranches[$tranche_count] in wait_event_names.txt and "
+ . "$tranche_name in lwlocklist.h)"
+ if $wait_event_tranches[$tranche_count] ne $tranche_name;
+
+ $tranche_count++;
+
+ next;
}
- $lastlockidx = $lockidx;
- # Add a "Lock" suffix to each lock name, as the C code depends on that
- printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n",
- $lockname . "Lock";
+ die "unable to parse lwlocklist.h line \"$_\"";
}
die
- "$wait_event_lwlocks[$i] defined in wait_event_names.txt but missing from "
- . "lwlocklist.h"
- if $i < scalar @wait_event_lwlocks;
+ "$wait_event_lwlocks[$lwlock_count] defined in wait_event_names.txt but "
+ . " missing from lwlocklist.h"
+ if $lwlock_count < scalar @wait_event_lwlocks;
+
+die
+ "$wait_event_tranches[$tranche_count] defined in wait_event_names.txt but "
+ . "missing from lwlocklist.h"
+ if $tranche_count < scalar @wait_event_tranches;
print $h "\n";
printf $h "#define NUM_INDIVIDUAL_LWLOCKS %s\n", $lastlockidx + 1;
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 2d43bf2cc13..ec9c345ffdf 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -122,9 +122,8 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0,
* own tranche. We absorb the names of these tranches from there into
* BuiltinTrancheNames here.
*
- * 2. There are some predefined tranches for built-in groups of locks.
- * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
- * appear in BuiltinTrancheNames[] below.
+ * 2. There are some predefined tranches for built-in groups of locks defined
+ * in lwlocklist.h. We absorb the names of these tranches, too.
*
* 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
* or LWLockRegisterTranche. The names of these that are known in the current
@@ -135,49 +134,10 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0,
*/
static const char *const BuiltinTrancheNames[] = {
#define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname),
+#define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname),
#include "storage/lwlocklist.h"
#undef PG_LWLOCK
- [LWTRANCHE_XACT_BUFFER] = "XactBuffer",
- [LWTRANCHE_COMMITTS_BUFFER] = "CommitTsBuffer",
- [LWTRANCHE_SUBTRANS_BUFFER] = "SubtransBuffer",
- [LWTRANCHE_MULTIXACTOFFSET_BUFFER] = "MultiXactOffsetBuffer",
- [LWTRANCHE_MULTIXACTMEMBER_BUFFER] = "MultiXactMemberBuffer",
- [LWTRANCHE_NOTIFY_BUFFER] = "NotifyBuffer",
- [LWTRANCHE_SERIAL_BUFFER] = "SerialBuffer",
- [LWTRANCHE_WAL_INSERT] = "WALInsert",
- [LWTRANCHE_BUFFER_CONTENT] = "BufferContent",
- [LWTRANCHE_REPLICATION_ORIGIN_STATE] = "ReplicationOriginState",
- [LWTRANCHE_REPLICATION_SLOT_IO] = "ReplicationSlotIO",
- [LWTRANCHE_LOCK_FASTPATH] = "LockFastPath",
- [LWTRANCHE_BUFFER_MAPPING] = "BufferMapping",
- [LWTRANCHE_LOCK_MANAGER] = "LockManager",
- [LWTRANCHE_PREDICATE_LOCK_MANAGER] = "PredicateLockManager",
- [LWTRANCHE_PARALLEL_HASH_JOIN] = "ParallelHashJoin",
- [LWTRANCHE_PARALLEL_BTREE_SCAN] = "ParallelBtreeScan",
- [LWTRANCHE_PARALLEL_QUERY_DSA] = "ParallelQueryDSA",
- [LWTRANCHE_PER_SESSION_DSA] = "PerSessionDSA",
- [LWTRANCHE_PER_SESSION_RECORD_TYPE] = "PerSessionRecordType",
- [LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod",
- [LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore",
- [LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap",
- [LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend",
- [LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList",
- [LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA",
- [LWTRANCHE_PGSTATS_HASH] = "PgStatsHash",
- [LWTRANCHE_PGSTATS_DATA] = "PgStatsData",
- [LWTRANCHE_LAUNCHER_DSA] = "LogicalRepLauncherDSA",
- [LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash",
- [LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA",
- [LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash",
- [LWTRANCHE_COMMITTS_SLRU] = "CommitTsSLRU",
- [LWTRANCHE_MULTIXACTOFFSET_SLRU] = "MultiXactOffsetSLRU",
- [LWTRANCHE_MULTIXACTMEMBER_SLRU] = "MultiXactMemberSLRU",
- [LWTRANCHE_NOTIFY_SLRU] = "NotifySLRU",
- [LWTRANCHE_SERIAL_SLRU] = "SerialSLRU",
- [LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
- [LWTRANCHE_XACT_SLRU] = "XactSLRU",
- [LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
- [LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion",
+#undef PG_LWLOCKTRANCHE
};
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..0be307d2ca0 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -356,9 +356,13 @@ AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
#
-# Predefined LWLocks (i.e., those declared in lwlocknames.h) must be listed
-# in the section above and must be listed in the same order as in
-# lwlocknames.h. Other LWLocks must be listed in the section below.
+# Predefined LWLocks (i.e., those declared at the top of lwlocknames.h) must be
+# listed in the section above and must be listed in the same order as in
+# lwlocknames.h.
+#
+# Likewise, the built-in LWLock tranches (i.e., those declared at the bottom of
+# lwlocknames.h) must be listed in the section below and must be listed in the
+# same order as in lwlocknames.h.
#
XactBuffer "Waiting for I/O on a transaction status SLRU buffer."
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index d44f8c262ba..a4f8b4faa90 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -21,6 +21,7 @@
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
+#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
@@ -410,3 +411,21 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+/*
+ * binary_upgrade_create_conflict_detection_slot
+ *
+ * Create a replication slot to retain information necessary for conflict
+ * detection such as dead tuples, commit timestamps, and origins.
+ */
+Datum
+binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS)
+{
+ CHECK_IS_BINARY_UPGRADE;
+
+ CreateConflictDetectionSlot();
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_VOID();
+}
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index ede10e5291e..6298edb26b5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5028,6 +5028,7 @@ getSubscriptions(Archive *fout)
int i_suboriginremotelsn;
int i_subenabled;
int i_subfailover;
+ int i_subretaindeadtuples;
int i,
ntups;
@@ -5100,10 +5101,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 170000)
appendPQExpBufferStr(query,
- " s.subfailover\n");
+ " s.subfailover,\n");
else
appendPQExpBufferStr(query,
- " false AS subfailover\n");
+ " false AS subfailover,\n");
+
+ if (fout->remoteVersion >= 190000)
+ appendPQExpBufferStr(query,
+ " s.subretaindeadtuples\n");
+ else
+ appendPQExpBufferStr(query,
+ " false AS subretaindeadtuples\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
@@ -5137,6 +5145,7 @@ getSubscriptions(Archive *fout)
i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
i_subrunasowner = PQfnumber(res, "subrunasowner");
i_subfailover = PQfnumber(res, "subfailover");
+ i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5170,6 +5179,8 @@ getSubscriptions(Archive *fout)
(strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
subinfo[i].subfailover =
(strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
+ subinfo[i].subretaindeadtuples =
+ (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname))
@@ -5428,6 +5439,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (subinfo->subfailover)
appendPQExpBufferStr(query, ", failover = true");
+ if (subinfo->subretaindeadtuples)
+ appendPQExpBufferStr(query, ", retain_dead_tuples = true");
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 2370c98d192..93a4475d51b 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -711,6 +711,7 @@ typedef struct _SubscriptionInfo
bool subpasswordrequired;
bool subrunasowner;
bool subfailover;
+ bool subretaindeadtuples;
char *subconninfo;
char *subslotname;
char *subsynccommit;
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 30579ef2051..5e6403f0773 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -28,7 +28,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
static void check_for_new_tablespace_dir(void);
static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
static void check_for_unicode_update(ClusterInfo *cluster);
-static void check_new_cluster_logical_replication_slots(void);
+static void check_new_cluster_replication_slots(void);
static void check_new_cluster_subscription_configuration(void);
static void check_old_cluster_for_valid_slots(void);
static void check_old_cluster_subscription_state(void);
@@ -631,7 +631,7 @@ check_and_dump_old_cluster(void)
* Before that the logical slots are not upgraded, so we will not be
* able to upgrade the logical replication clusters completely.
*/
- get_subscription_count(&old_cluster);
+ get_subscription_info(&old_cluster);
check_old_cluster_subscription_state();
}
@@ -764,7 +764,7 @@ check_new_cluster(void)
check_for_new_tablespace_dir();
- check_new_cluster_logical_replication_slots();
+ check_new_cluster_replication_slots();
check_new_cluster_subscription_configuration();
}
@@ -2040,48 +2040,80 @@ check_for_unicode_update(ClusterInfo *cluster)
}
/*
- * check_new_cluster_logical_replication_slots()
+ * check_new_cluster_replication_slots()
*
- * Verify that there are no logical replication slots on the new cluster and
- * that the parameter settings necessary for creating slots are sufficient.
+ * Validate the new cluster's readiness for migrating replication slots:
+ * - Ensures no existing logical replication slots on the new cluster when
+ * migrating logical slots.
+ * - Ensure conflict detection slot does not exist on the new cluster when
+ * migrating subscriptions with retain_dead_tuples enabled.
+ * - Ensure that the parameter settings on the new cluster necessary for
+ * creating slots are sufficient.
*/
static void
-check_new_cluster_logical_replication_slots(void)
+check_new_cluster_replication_slots(void)
{
PGresult *res;
PGconn *conn;
int nslots_on_old;
int nslots_on_new;
+ int rdt_slot_on_new;
int max_replication_slots;
char *wal_level;
+ int i_nslots_on_new;
+ int i_rdt_slot_on_new;
- /* Logical slots can be migrated since PG17. */
+ /*
+ * Logical slots can be migrated since PG17 and a physical slot
+ * CONFLICT_DETECTION_SLOT can be migrated since PG19.
+ */
if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
return;
nslots_on_old = count_old_cluster_logical_slots();
- /* Quick return if there are no logical slots to be migrated. */
- if (nslots_on_old == 0)
+ /*
+ * Quick return if there are no slots to be migrated and no subscriptions
+ * have the retain_dead_tuples option enabled.
+ */
+ if (nslots_on_old == 0 && !old_cluster.sub_retain_dead_tuples)
return;
conn = connectToServer(&new_cluster, "template1");
- prep_status("Checking for new cluster logical replication slots");
+ prep_status("Checking for new cluster replication slots");
- res = executeQueryOrDie(conn, "SELECT count(*) "
- "FROM pg_catalog.pg_replication_slots "
- "WHERE slot_type = 'logical' AND "
- "temporary IS FALSE;");
+ res = executeQueryOrDie(conn, "SELECT %s AS nslots_on_new, %s AS rdt_slot_on_new "
+ "FROM pg_catalog.pg_replication_slots",
+ nslots_on_old > 0
+ ? "COUNT(*) FILTER (WHERE slot_type = 'logical' AND temporary IS FALSE)"
+ : "0",
+ old_cluster.sub_retain_dead_tuples
+ ? "COUNT(*) FILTER (WHERE slot_name = 'pg_conflict_detection')"
+ : "0");
if (PQntuples(res) != 1)
- pg_fatal("could not count the number of logical replication slots");
+ pg_fatal("could not count the number of replication slots");
- nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+ i_nslots_on_new = PQfnumber(res, "nslots_on_new");
+ i_rdt_slot_on_new = PQfnumber(res, "rdt_slot_on_new");
+
+ nslots_on_new = atoi(PQgetvalue(res, 0, i_nslots_on_new));
if (nslots_on_new)
+ {
+ Assert(nslots_on_old);
pg_fatal("expected 0 logical replication slots but found %d",
nslots_on_new);
+ }
+
+ rdt_slot_on_new = atoi(PQgetvalue(res, 0, i_rdt_slot_on_new));
+
+ if (rdt_slot_on_new)
+ {
+ Assert(old_cluster.sub_retain_dead_tuples);
+ pg_fatal("The replication slot \"pg_conflict_detection\" already exists on the new cluster");
+ }
PQclear(res);
@@ -2094,12 +2126,24 @@ check_new_cluster_logical_replication_slots(void)
wal_level = PQgetvalue(res, 0, 0);
- if (strcmp(wal_level, "logical") != 0)
+ if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0)
pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"",
wal_level);
+ if (old_cluster.sub_retain_dead_tuples &&
+ strcmp(wal_level, "minimal") == 0)
+ pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"",
+ wal_level);
+
max_replication_slots = atoi(PQgetvalue(res, 1, 0));
+ if (old_cluster.sub_retain_dead_tuples &&
+ nslots_on_old + 1 > max_replication_slots)
+ pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
+ "logical replication slots on the old cluster plus one additional slot required "
+ "for retaining conflict detection information (%d)",
+ max_replication_slots, nslots_on_old + 1);
+
if (nslots_on_old > max_replication_slots)
pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
"logical replication slots (%d) on the old cluster",
@@ -2211,6 +2255,22 @@ check_old_cluster_for_valid_slots(void)
"The slot \"%s\" has not consumed the WAL yet\n",
slot->slotname);
}
+
+ /*
+ * The name "pg_conflict_detection" (defined as
+ * CONFLICT_DETECTION_SLOT) has been reserved for logical
+ * replication conflict detection slot since PG19.
+ */
+ if (strcmp(slot->slotname, "pg_conflict_detection") == 0)
+ {
+ if (script == NULL &&
+ (script = fopen_priv(output_path, "w")) == NULL)
+ pg_fatal("could not open file \"%s\": %m", output_path);
+
+ fprintf(script,
+ "The slot name \"%s\" is reserved\n",
+ slot->slotname);
+ }
}
}
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 4b7a56f5b3b..a437067cdca 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -752,20 +752,33 @@ count_old_cluster_logical_slots(void)
}
/*
- * get_subscription_count()
+ * get_subscription_info()
*
- * Gets the number of subscriptions in the cluster.
+ * Gets the information of subscriptions in the cluster.
*/
void
-get_subscription_count(ClusterInfo *cluster)
+get_subscription_info(ClusterInfo *cluster)
{
PGconn *conn;
PGresult *res;
+ int i_nsub;
+ int i_retain_dead_tuples;
conn = connectToServer(cluster, "template1");
- res = executeQueryOrDie(conn, "SELECT count(*) "
- "FROM pg_catalog.pg_subscription");
- cluster->nsubs = atoi(PQgetvalue(res, 0, 0));
+ if (GET_MAJOR_VERSION(cluster->major_version) >= 1900)
+ res = executeQueryOrDie(conn, "SELECT count(*) AS nsub,"
+ "COUNT(CASE WHEN subretaindeadtuples THEN 1 END) > 0 AS retain_dead_tuples "
+ "FROM pg_catalog.pg_subscription");
+ else
+ res = executeQueryOrDie(conn, "SELECT count(*) AS nsub,"
+ "'f' AS retain_dead_tuples "
+ "FROM pg_catalog.pg_subscription");
+
+ i_nsub = PQfnumber(res, "nsub");
+ i_retain_dead_tuples = PQfnumber(res, "retain_dead_tuples");
+
+ cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub));
+ cluster->sub_retain_dead_tuples = (strcmp(PQgetvalue(res, 0, i_retain_dead_tuples), "t") == 0);
PQclear(res);
PQfinish(conn);
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 536e49d2616..d5cd5bf0b3a 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -67,6 +67,7 @@ static void set_frozenxids(bool minmxid_only);
static void make_outputdirs(char *pgdata);
static void setup(char *argv0);
static void create_logical_replication_slots(void);
+static void create_conflict_detection_slot(void);
ClusterInfo old_cluster,
new_cluster;
@@ -88,6 +89,7 @@ int
main(int argc, char **argv)
{
char *deletion_script_file_name = NULL;
+ bool migrate_logical_slots;
/*
* pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -198,18 +200,39 @@ main(int argc, char **argv)
new_cluster.pgdata);
check_ok();
+ migrate_logical_slots = count_old_cluster_logical_slots();
+
/*
- * Migrate the logical slots to the new cluster. Note that we need to do
- * this after resetting WAL because otherwise the required WAL would be
- * removed and slots would become unusable. There is a possibility that
- * background processes might generate some WAL before we could create the
- * slots in the new cluster but we can ignore that WAL as that won't be
- * required downstream.
+ * Migrate replication slots to the new cluster.
+ *
+ * Note that we must migrate logical slots after resetting WAL because
+ * otherwise the required WAL would be removed and slots would become
+ * unusable. There is a possibility that background processes might
+ * generate some WAL before we could create the slots in the new cluster
+ * but we can ignore that WAL as that won't be required downstream.
+ *
+ * The conflict detection slot is not affected by concerns related to WALs
+ * as it only retains the dead tuples. It is created here for consistency.
+ * Note that the new conflict detection slot uses the latest transaction
+ * ID as xmin, so it cannot protect dead tuples that existed before the
+ * upgrade. Additionally, commit timestamps and origin data are not
+ * preserved during the upgrade. So, even after creating the slot, the
+ * upgraded subscriber may be unable to detect conflicts or log relevant
+ * commit timestamps and origins when applying changes from the publisher
+ * occurred before the upgrade especially if those changes were not
+ * replicated. It can only protect tuples that might be deleted after the
+ * new cluster starts.
*/
- if (count_old_cluster_logical_slots())
+ if (migrate_logical_slots || old_cluster.sub_retain_dead_tuples)
{
start_postmaster(&new_cluster, true);
- create_logical_replication_slots();
+
+ if (migrate_logical_slots)
+ create_logical_replication_slots();
+
+ if (old_cluster.sub_retain_dead_tuples)
+ create_conflict_detection_slot();
+
stop_postmaster(false);
}
@@ -1025,3 +1048,24 @@ create_logical_replication_slots(void)
return;
}
+
+/*
+ * create_conflict_detection_slot()
+ *
+ * Create a replication slot to retain information necessary for conflict
+ * detection such as dead tuples, commit timestamps, and origins, for migrated
+ * subscriptions with retain_dead_tuples enabled.
+ */
+static void
+create_conflict_detection_slot(void)
+{
+ PGconn *conn_new_template1;
+
+ prep_status("Creating the replication conflict detection slot");
+
+ conn_new_template1 = connectToServer(&new_cluster, "template1");
+ PQclear(executeQueryOrDie(conn_new_template1, "SELECT pg_catalog.binary_upgrade_create_conflict_detection_slot()"));
+ PQfinish(conn_new_template1);
+
+ check_ok();
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 69c965bb7d0..e9401430e69 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -302,6 +302,8 @@ typedef struct
uint32 bin_version; /* version returned from pg_ctl */
const char *tablespace_suffix; /* directory specification */
int nsubs; /* number of subscriptions */
+ bool sub_retain_dead_tuples; /* whether a subscription enables
+ * retain_dead_tuples. */
} ClusterInfo;
@@ -441,7 +443,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
const char *new_pgdata);
void get_db_rel_and_slot_infos(ClusterInfo *cluster);
int count_old_cluster_logical_slots(void);
-void get_subscription_count(ClusterInfo *cluster);
+void get_subscription_info(ClusterInfo *cluster);
/* option.c */
diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl
index e46f02c6cc6..77387be0f9d 100644
--- a/src/bin/pg_upgrade/t/004_subscription.pl
+++ b/src/bin/pg_upgrade/t/004_subscription.pl
@@ -22,13 +22,13 @@ $publisher->start;
# Initialize the old subscriber node
my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
-$old_sub->init;
+$old_sub->init(allows_streaming => 'physical');
$old_sub->start;
my $oldbindir = $old_sub->config_data('--bindir');
# Initialize the new subscriber
my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
-$new_sub->init;
+$new_sub->init(allows_streaming => 'physical');
my $newbindir = $new_sub->config_data('--bindir');
# In a VPATH build, we'll be started in the source directory, but we want
@@ -90,6 +90,54 @@ $old_sub->start;
$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;");
# ------------------------------------------------------
+# Check that pg_upgrade fails when max_replication_slots configured in the new
+# cluster is less than the number of logical slots in the old cluster + 1 when
+# subscription's retain_dead_tuples option is enabled.
+# ------------------------------------------------------
+# It is sufficient to use disabled subscription to test upgrade failure.
+
+$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1");
+$old_sub->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_dead_tuples = true)"
+);
+
+$old_sub->stop;
+
+$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 0');
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots.
+command_checks_all(
+ [
+ 'pg_upgrade',
+ '--no-sync',
+ '--old-datadir' => $old_sub->data_dir,
+ '--new-datadir' => $new_sub->data_dir,
+ '--old-bindir' => $oldbindir,
+ '--new-bindir' => $newbindir,
+ '--socketdir' => $new_sub->host,
+ '--old-port' => $old_sub->port,
+ '--new-port' => $new_sub->port,
+ $mode,
+ '--check',
+ ],
+ 1,
+ [
+ qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/
+ ],
+ [qr//],
+ 'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+
+# Reset max_replication_slots
+$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 10');
+
+# Cleanup
+$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1");
+$old_sub->start;
+$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;");
+
+# ------------------------------------------------------
# Check that pg_upgrade refuses to run if:
# a) there's a subscription with tables in a state other than 'r' (ready) or
# 'i' (init) and/or
@@ -200,8 +248,9 @@ $old_sub->safe_psql(
rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
# Verify that the upgrade should be successful with tables in 'ready'/'init'
-# state along with retaining the replication origin's remote lsn, subscription's
-# running status, and failover option.
+# state along with retaining the replication origin's remote lsn,
+# subscription's running status, failover option, and retain_dead_tuples
+# option.
$publisher->safe_psql(
'postgres', qq[
CREATE TABLE tab_upgraded1(id int);
@@ -211,7 +260,7 @@ $publisher->safe_psql(
$old_sub->safe_psql(
'postgres', qq[
CREATE TABLE tab_upgraded1(id int);
- CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true);
+ CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true, retain_dead_tuples = true);
]);
# Wait till the table tab_upgraded1 reaches 'ready' state
@@ -270,7 +319,8 @@ $new_sub->append_conf('postgresql.conf',
# Check that pg_upgrade is successful when all tables are in ready or in
# init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is
# in init state) along with retaining the replication origin's remote lsn,
-# subscription's running status, and failover option.
+# subscription's running status, failover option, and retain_dead_tuples
+# option.
# ------------------------------------------------------
command_ok(
[
@@ -293,7 +343,8 @@ ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
# ------------------------------------------------------
# Check that the data inserted to the publisher when the new subscriber is down
# will be replicated once it is started. Also check that the old subscription
-# states and relations origins are all preserved.
+# states and relations origins are all preserved, and that the conflict
+# detection slot is created.
# ------------------------------------------------------
$publisher->safe_psql(
'postgres', qq[
@@ -303,15 +354,16 @@ $publisher->safe_psql(
$new_sub->start;
-# The subscription's running status and failover option should be preserved
-# in the upgraded instance. So regress_sub4 should still have subenabled and
-# subfailover set to true, while regress_sub5 should have both set to false.
+# The subscription's running status, failover option, and retain_dead_tuples
+# option should be preserved in the upgraded instance. So regress_sub4 should
+# still have subenabled, subfailover, and subretaindeadtuples set to true,
+# while regress_sub5 should have both set to false.
$result = $new_sub->safe_psql('postgres',
- "SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname"
+ "SELECT subname, subenabled, subfailover, subretaindeadtuples FROM pg_subscription ORDER BY subname"
);
-is( $result, qq(regress_sub4|t|t
-regress_sub5|f|f),
- "check that the subscription's running status and failover are preserved"
+is( $result, qq(regress_sub4|t|t|t
+regress_sub5|f|f|f),
+ "check that the subscription's running status, failover, and retain_dead_tuples are preserved"
);
# Subscription relations should be preserved
@@ -330,6 +382,11 @@ $result = $new_sub->safe_psql('postgres',
);
is($result, qq($remote_lsn), "remote_lsn should have been preserved");
+# The conflict detection slot should be created
+$result = $new_sub->safe_psql('postgres',
+ "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'");
+is($result, qq(t), "conflict detection slot exists");
+
# Resume the initial sync and wait until all tables of subscription
# 'regress_sub5' are synchronized
$new_sub->append_conf('postgresql.conf',
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index dd25d2fe7b8..7a06af48842 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
- false};
+ false, false};
if (pset.sversion < 100000)
{
@@ -6814,6 +6814,10 @@ describeSubscriptions(const char *pattern, bool verbose)
appendPQExpBuffer(&buf,
", subfailover AS \"%s\"\n",
gettext_noop("Failover"));
+ if (pset.sversion >= 190000)
+ appendPQExpBuffer(&buf,
+ ", subretaindeadtuples AS \"%s\"\n",
+ gettext_noop("Retain dead tuples"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 37524364290..dbc586c5bc3 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2319,8 +2319,9 @@ match_previous_words(int pattern_id,
/* ALTER SUBSCRIPTION <name> SET ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
- "password_required", "run_as_owner", "slot_name",
- "streaming", "synchronous_commit", "two_phase");
+ "password_required", "retain_dead_tuples",
+ "run_as_owner", "slot_name", "streaming",
+ "synchronous_commit", "two_phase");
/* ALTER SUBSCRIPTION <name> SKIP ( */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "("))
COMPLETE_WITH("lsn");
@@ -3774,8 +3775,9 @@ match_previous_words(int pattern_id,
else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
"disable_on_error", "enabled", "failover", "origin",
- "password_required", "run_as_owner", "slot_name",
- "streaming", "synchronous_commit", "two_phase");
+ "password_required", "retain_dead_tuples",
+ "run_as_owner", "slot_name", "streaming",
+ "synchronous_commit", "two_phase");
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 2cf8d55d706..cc06fc29ab2 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -316,16 +316,6 @@ typedef struct XLogRecData
uint32 len; /* length of rmgr data to include */
} XLogRecData;
-/*
- * Recovery target action.
- */
-typedef enum
-{
- RECOVERY_TARGET_ACTION_PAUSE,
- RECOVERY_TARGET_ACTION_PROMOTE,
- RECOVERY_TARGET_ACTION_SHUTDOWN,
-} RecoveryTargetAction;
-
struct LogicalDecodingContext;
struct XLogRecordBuffer;
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 91446303024..8e475e266d1 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -40,6 +40,16 @@ typedef enum
RECOVERY_TARGET_TIMELINE_NUMERIC,
} RecoveryTargetTimeLineGoal;
+/*
+ * Recovery target action.
+ */
+typedef enum
+{
+ RECOVERY_TARGET_ACTION_PAUSE,
+ RECOVERY_TARGET_ACTION_PROMOTE,
+ RECOVERY_TARGET_ACTION_SHUTDOWN,
+} RecoveryTargetAction;
+
/* Recovery pause states */
typedef enum RecoveryPauseState
{
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index a3f3315fed9..5173d422d46 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202507091
+#define CATALOG_VERSION_NO 202507231
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1fc19146f46..3ee8fed7e53 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11801,6 +11801,10 @@
proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f',
provolatile => 'v', proparallel => 'u', prorettype => 'void',
proargtypes => 'text pg_lsn', prosrc => 'binary_upgrade_replorigin_advance' },
+{ oid => '9159', descr => 'for use by pg_upgrade (conflict detection slot)',
+ proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f',
+ provolatile => 'v', proparallel => 'u', prorettype => 'void',
+ proargtypes => '', prosrc => 'binary_upgrade_create_conflict_detection_slot' },
# conversion functions
{ oid => '4302',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 20fc329992d..231ef84ec9a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* slots) in the upstream database are enabled
* to be synchronized to the standbys. */
+ bool subretaindeadtuples; /* True if dead tuples useful for
+ * conflict detection are retained */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@@ -131,6 +134,8 @@ typedef struct Subscription
* (i.e. the main slot and the table sync
* slots) in the upstream database are enabled
* to be synchronized to the standbys. */
+ bool retaindeadtuples; /* True if dead tuples useful for conflict
+ * detection are retained */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index c2262e46a7f..9b288ad22a6 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -28,4 +28,9 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
extern char defGetStreamingMode(DefElem *def);
+extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel);
+
+extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+ int elevel_for_sub_disabled);
+
#endif /* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h
index 49137a0a570..af13bd6bf3d 100644
--- a/src/include/libpq/libpq-be-fe-helpers.h
+++ b/src/include/libpq/libpq-be-fe-helpers.h
@@ -481,7 +481,7 @@ libpqsrv_notice_receiver(void *arg, const PGresult *res)
len--;
ereport(LOG,
- errmsg_internal("%s: %.*s", _(prefix), len, message));
+ errmsg_internal("%s: %.*s", prefix, len, message));
}
#endif /* LIBPQ_BE_FE_HELPERS_H */
diff --git a/src/include/port/solaris.h b/src/include/port/solaris.h
index e63a3bd824d..8ff40007c7f 100644
--- a/src/include/port/solaris.h
+++ b/src/include/port/solaris.h
@@ -24,3 +24,12 @@
#if defined(__i386__)
#include <sys/isa_defs.h>
#endif
+
+/*
+ * On original Solaris, PAM conversation procs lack a "const" in their
+ * declaration; but recent OpenIndiana versions put it there by default.
+ * The least messy way to deal with this is to define _PAM_LEGACY_NONCONST,
+ * which causes OpenIndiana to declare pam_conv per the Solaris tradition,
+ * and also use that symbol to control omitting the "const" in our own code.
+ */
+#define _PAM_LEGACY_NONCONST 1
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..b29453e8e4f 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -25,8 +25,11 @@ extern void ApplyLauncherShmemInit(void);
extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeup(void);
extern void AtEOXact_ApplyLauncher(bool isCommit);
+extern void CreateConflictDetectionSlot(void);
+
extern bool IsLogicalLauncher(void);
extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 19b4e8b6a03..e8fc342d1a9 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -21,6 +21,13 @@
#define PG_REPLSLOT_DIR "pg_replslot"
/*
+ * The reserved name for a replication slot used to retain dead tuples for
+ * conflict detection in logical replication. See
+ * maybe_advance_nonremovable_xid() for detail.
+ */
+#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
+
+/*
* Behaviour of replication slots, upon release or crash.
*
* Slots marked as PERSISTENT are crash-safe and will not be dropped when
@@ -311,7 +318,9 @@ extern void ReplicationSlotMarkDirty(void);
/* misc stuff */
extern void ReplicationSlotInitialize(void);
-extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern bool ReplicationSlotValidateName(const char *name,
+ bool allow_reserved_name,
+ int elevel);
extern void ReplicationSlotReserveWal(void);
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
extern void ReplicationSlotsComputeRequiredLSN(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..0c7b8440a61 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -86,6 +86,16 @@ typedef struct LogicalRepWorker
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
+ /*
+ * The changes made by this and later transactions must be retained to
+ * ensure reliable conflict detection during the apply phase.
+ *
+ * The logical replication launcher manages an internal replication slot
+ * named "pg_conflict_detection". It asynchronously collects this ID to
+ * decide when to advance the xmin value of the slot.
+ */
+ TransactionId oldest_nonremovable_xid;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@@ -245,7 +255,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running,
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
- dsm_handle subworker_dsm);
+ dsm_handle subworker_dsm,
+ bool retain_dead_tuples);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index e7a0a234b6c..2933eea0649 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -201,7 +201,7 @@ typedef enum PgAioHandleCallbackID
} PgAioHandleCallbackID;
#define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV
-StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS),
+StaticAssertDecl(PGAIO_HCB_MAX < (1 << PGAIO_RESULT_ID_BITS),
"PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS");
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 08a72569ae5..5e717765764 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -176,51 +176,23 @@ extern void LWLockInitialize(LWLock *lock, int tranche_id);
* Every tranche ID less than NUM_INDIVIDUAL_LWLOCKS is reserved; also,
* we reserve additional tranche IDs for builtin tranches not included in
* the set of individual LWLocks. A call to LWLockNewTrancheId will never
- * return a value less than LWTRANCHE_FIRST_USER_DEFINED.
+ * return a value less than LWTRANCHE_FIRST_USER_DEFINED. The actual list of
+ * built-in tranches is kept in lwlocklist.h.
*/
typedef enum BuiltinTrancheIds
{
- LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
- LWTRANCHE_COMMITTS_BUFFER,
- LWTRANCHE_SUBTRANS_BUFFER,
- LWTRANCHE_MULTIXACTOFFSET_BUFFER,
- LWTRANCHE_MULTIXACTMEMBER_BUFFER,
- LWTRANCHE_NOTIFY_BUFFER,
- LWTRANCHE_SERIAL_BUFFER,
- LWTRANCHE_WAL_INSERT,
- LWTRANCHE_BUFFER_CONTENT,
- LWTRANCHE_REPLICATION_ORIGIN_STATE,
- LWTRANCHE_REPLICATION_SLOT_IO,
- LWTRANCHE_LOCK_FASTPATH,
- LWTRANCHE_BUFFER_MAPPING,
- LWTRANCHE_LOCK_MANAGER,
- LWTRANCHE_PREDICATE_LOCK_MANAGER,
- LWTRANCHE_PARALLEL_HASH_JOIN,
- LWTRANCHE_PARALLEL_BTREE_SCAN,
- LWTRANCHE_PARALLEL_QUERY_DSA,
- LWTRANCHE_PER_SESSION_DSA,
- LWTRANCHE_PER_SESSION_RECORD_TYPE,
- LWTRANCHE_PER_SESSION_RECORD_TYPMOD,
- LWTRANCHE_SHARED_TUPLESTORE,
- LWTRANCHE_SHARED_TIDBITMAP,
- LWTRANCHE_PARALLEL_APPEND,
- LWTRANCHE_PER_XACT_PREDICATE_LIST,
- LWTRANCHE_PGSTATS_DSA,
- LWTRANCHE_PGSTATS_HASH,
- LWTRANCHE_PGSTATS_DATA,
- LWTRANCHE_LAUNCHER_DSA,
- LWTRANCHE_LAUNCHER_HASH,
- LWTRANCHE_DSM_REGISTRY_DSA,
- LWTRANCHE_DSM_REGISTRY_HASH,
- LWTRANCHE_COMMITTS_SLRU,
- LWTRANCHE_MULTIXACTMEMBER_SLRU,
- LWTRANCHE_MULTIXACTOFFSET_SLRU,
- LWTRANCHE_NOTIFY_SLRU,
- LWTRANCHE_SERIAL_SLRU,
- LWTRANCHE_SUBTRANS_SLRU,
- LWTRANCHE_XACT_SLRU,
- LWTRANCHE_PARALLEL_VACUUM_DSA,
- LWTRANCHE_AIO_URING_COMPLETION,
+ /*
+ * LWTRANCHE_INVALID is an unused value that only exists to initialize the
+ * rest of the tranches to appropriate values.
+ */
+ LWTRANCHE_INVALID = NUM_INDIVIDUAL_LWLOCKS - 1,
+
+#define PG_LWLOCK(id, name)
+#define PG_LWLOCKTRANCHE(id, name) LWTRANCHE_##id,
+#include "storage/lwlocklist.h"
+#undef PG_LWLOCK
+#undef PG_LWLOCKTRANCHE
+
LWTRANCHE_FIRST_USER_DEFINED,
} BuiltinTrancheIds;
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index a9681738146..208d2e3a8ed 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -2,9 +2,10 @@
*
* lwlocklist.h
*
- * The predefined LWLock list is kept in its own source file for use by
- * automatic tools. The exact representation of a keyword is determined by
- * the PG_LWLOCK macro, which is not defined in this file; it can be
+ * The list of predefined LWLocks and built-in LWLock tranches is kept in
+ * its own source file for use by automatic tools. The exact
+ * representation of a keyword is determined by the PG_LWLOCK and
+ * PG_LWLOCKTRANCHE macros, which are not defined in this file; they can be
* defined by the caller for special purposes.
*
* Also, generate-lwlocknames.pl processes this file to create lwlocknames.h.
@@ -84,3 +85,53 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
+
+/*
+ * There also exist several built-in LWLock tranches. As with the predefined
+ * LWLocks, be sure to update the WaitEventLWLock section of
+ * src/backend/utils/activity/wait_event_names.txt when modifying this list.
+ *
+ * Note that the IDs here (the first value) don't include the LWTRANCHE_
+ * prefix. It's added elsewhere.
+ */
+PG_LWLOCKTRANCHE(XACT_BUFFER, XactBuffer)
+PG_LWLOCKTRANCHE(COMMITTS_BUFFER, CommitTsBuffer)
+PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
+PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
+PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
+PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
+PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
+PG_LWLOCKTRANCHE(REPLICATION_ORIGIN_STATE, ReplicationOriginState)
+PG_LWLOCKTRANCHE(REPLICATION_SLOT_IO, ReplicationSlotIO)
+PG_LWLOCKTRANCHE(LOCK_FASTPATH, LockFastPath)
+PG_LWLOCKTRANCHE(BUFFER_MAPPING, BufferMapping)
+PG_LWLOCKTRANCHE(LOCK_MANAGER, LockManager)
+PG_LWLOCKTRANCHE(PREDICATE_LOCK_MANAGER, PredicateLockManager)
+PG_LWLOCKTRANCHE(PARALLEL_HASH_JOIN, ParallelHashJoin)
+PG_LWLOCKTRANCHE(PARALLEL_BTREE_SCAN, ParallelBtreeScan)
+PG_LWLOCKTRANCHE(PARALLEL_QUERY_DSA, ParallelQueryDSA)
+PG_LWLOCKTRANCHE(PER_SESSION_DSA, PerSessionDSA)
+PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPE, PerSessionRecordType)
+PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPMOD, PerSessionRecordTypmod)
+PG_LWLOCKTRANCHE(SHARED_TUPLESTORE, SharedTupleStore)
+PG_LWLOCKTRANCHE(SHARED_TIDBITMAP, SharedTidBitmap)
+PG_LWLOCKTRANCHE(PARALLEL_APPEND, ParallelAppend)
+PG_LWLOCKTRANCHE(PER_XACT_PREDICATE_LIST, PerXactPredicateList)
+PG_LWLOCKTRANCHE(PGSTATS_DSA, PgStatsDSA)
+PG_LWLOCKTRANCHE(PGSTATS_HASH, PgStatsHash)
+PG_LWLOCKTRANCHE(PGSTATS_DATA, PgStatsData)
+PG_LWLOCKTRANCHE(LAUNCHER_DSA, LogicalRepLauncherDSA)
+PG_LWLOCKTRANCHE(LAUNCHER_HASH, LogicalRepLauncherHash)
+PG_LWLOCKTRANCHE(DSM_REGISTRY_DSA, DSMRegistryDSA)
+PG_LWLOCKTRANCHE(DSM_REGISTRY_HASH, DSMRegistryHash)
+PG_LWLOCKTRANCHE(COMMITTS_SLRU, CommitTsSLRU)
+PG_LWLOCKTRANCHE(MULTIXACTOFFSET_SLRU, MultiXactOffsetSLRU)
+PG_LWLOCKTRANCHE(MULTIXACTMEMBER_SLRU, MultiXactMemberSLRU)
+PG_LWLOCKTRANCHE(NOTIFY_SLRU, NotifySLRU)
+PG_LWLOCKTRANCHE(SERIAL_SLRU, SerialSLRU)
+PG_LWLOCKTRANCHE(SUBTRANS_SLRU, SubtransSLRU)
+PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU)
+PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA)
+PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion)
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 9f9b3fcfbf1..c6f5ebceefd 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -130,9 +130,17 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend;
* the checkpoint are actually destroyed on disk. Replay can cope with a file
* or block that doesn't exist, but not with a block that has the wrong
* contents.
+ *
+ * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but
+ * it explicitly indicates that the reason for delaying the checkpoint is due
+ * to a transaction being within a critical commit section. We need this new
+ * flag to ensure all the transactions that have acquired commit timestamp are
+ * finished before we allow the logical replication client to advance its xid
+ * which is used to hold back dead rows for conflict detection.
*/
#define DELAY_CHKPT_START (1<<0)
#define DELAY_CHKPT_COMPLETE (1<<1)
+#define DELAY_CHKPT_IN_COMMIT (DELAY_CHKPT_START | 1<<2)
typedef enum
{
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index e4877d88e8f..2f4ae06c279 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -55,7 +55,8 @@ extern RunningTransactions GetRunningTransactionData(void);
extern bool TransactionIdIsInProgress(TransactionId xid);
extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
extern TransactionId GetOldestTransactionIdConsideredRunning(void);
-extern TransactionId GetOldestActiveTransactionId(void);
+extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly,
+ bool allDbs);
extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin);
diff --git a/src/interfaces/ecpg/ecpglib/connect.c b/src/interfaces/ecpg/ecpglib/connect.c
index 713cbbf6360..78de9f298ba 100644
--- a/src/interfaces/ecpg/ecpglib/connect.c
+++ b/src/interfaces/ecpg/ecpglib/connect.c
@@ -264,7 +264,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
struct connection *this;
int i,
connect_params = 0;
- char *dbname = name ? ecpg_strdup(name, lineno) : NULL,
+ bool alloc_failed = (sqlca == NULL);
+ char *dbname = name ? ecpg_strdup(name, lineno, &alloc_failed) : NULL,
*host = NULL,
*tmp,
*port = NULL,
@@ -273,11 +274,12 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
const char **conn_keywords;
const char **conn_values;
- if (sqlca == NULL)
+ if (alloc_failed)
{
ecpg_raise(lineno, ECPG_OUT_OF_MEMORY,
ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL);
- ecpg_free(dbname);
+ if (dbname)
+ ecpg_free(dbname);
return false;
}
@@ -302,7 +304,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
if (envname)
{
ecpg_free(dbname);
- dbname = ecpg_strdup(envname, lineno);
+ dbname = ecpg_strdup(envname, lineno, &alloc_failed);
}
}
@@ -354,7 +356,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
tmp = strrchr(dbname + offset, '?');
if (tmp != NULL) /* options given */
{
- options = ecpg_strdup(tmp + 1, lineno);
+ options = ecpg_strdup(tmp + 1, lineno, &alloc_failed);
*tmp = '\0';
}
@@ -363,7 +365,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
{
if (tmp[1] != '\0') /* non-empty database name */
{
- realname = ecpg_strdup(tmp + 1, lineno);
+ realname = ecpg_strdup(tmp + 1, lineno, &alloc_failed);
connect_params++;
}
*tmp = '\0';
@@ -373,7 +375,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
if (tmp != NULL) /* port number given */
{
*tmp = '\0';
- port = ecpg_strdup(tmp + 1, lineno);
+ port = ecpg_strdup(tmp + 1, lineno, &alloc_failed);
connect_params++;
}
@@ -407,7 +409,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
{
if (*(dbname + offset) != '\0')
{
- host = ecpg_strdup(dbname + offset, lineno);
+ host = ecpg_strdup(dbname + offset, lineno, &alloc_failed);
connect_params++;
}
}
@@ -419,7 +421,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
tmp = strrchr(dbname, ':');
if (tmp != NULL) /* port number given */
{
- port = ecpg_strdup(tmp + 1, lineno);
+ port = ecpg_strdup(tmp + 1, lineno, &alloc_failed);
connect_params++;
*tmp = '\0';
}
@@ -427,14 +429,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
tmp = strrchr(dbname, '@');
if (tmp != NULL) /* host name given */
{
- host = ecpg_strdup(tmp + 1, lineno);
+ host = ecpg_strdup(tmp + 1, lineno, &alloc_failed);
connect_params++;
*tmp = '\0';
}
if (strlen(dbname) > 0)
{
- realname = ecpg_strdup(dbname, lineno);
+ realname = ecpg_strdup(dbname, lineno, &alloc_failed);
connect_params++;
}
else
@@ -465,7 +467,18 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
*/
conn_keywords = (const char **) ecpg_alloc((connect_params + 1) * sizeof(char *), lineno);
conn_values = (const char **) ecpg_alloc(connect_params * sizeof(char *), lineno);
- if (conn_keywords == NULL || conn_values == NULL)
+
+ /* Decide on a connection name */
+ if (connection_name != NULL || realname != NULL)
+ {
+ this->name = ecpg_strdup(connection_name ? connection_name : realname,
+ lineno, &alloc_failed);
+ }
+ else
+ this->name = NULL;
+
+ /* Deal with any failed allocations above */
+ if (conn_keywords == NULL || conn_values == NULL || alloc_failed)
{
if (host)
ecpg_free(host);
@@ -481,6 +494,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
ecpg_free(conn_keywords);
if (conn_values)
ecpg_free(conn_values);
+ if (this->name)
+ ecpg_free(this->name);
free(this);
return false;
}
@@ -515,17 +530,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p
ecpg_free(conn_keywords);
if (conn_values)
ecpg_free(conn_values);
+ if (this->name)
+ ecpg_free(this->name);
free(this);
return false;
}
}
#endif
- if (connection_name != NULL)
- this->name = ecpg_strdup(connection_name, lineno);
- else
- this->name = ecpg_strdup(realname, lineno);
-
this->cache_head = NULL;
this->prep_stmts = NULL;
diff --git a/src/interfaces/ecpg/ecpglib/descriptor.c b/src/interfaces/ecpg/ecpglib/descriptor.c
index 651d5c8b2ed..466428edfeb 100644
--- a/src/interfaces/ecpg/ecpglib/descriptor.c
+++ b/src/interfaces/ecpg/ecpglib/descriptor.c
@@ -240,8 +240,9 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...)
act_tuple;
struct variable data_var;
struct sqlca_t *sqlca = ECPGget_sqlca();
+ bool alloc_failed = (sqlca == NULL);
- if (sqlca == NULL)
+ if (alloc_failed)
{
ecpg_raise(lineno, ECPG_OUT_OF_MEMORY,
ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL);
@@ -493,7 +494,14 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...)
#ifdef WIN32
stmt.oldthreadlocale = _configthreadlocale(_ENABLE_PER_THREAD_LOCALE);
#endif
- stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno);
+ stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL),
+ lineno, &alloc_failed);
+ if (alloc_failed)
+ {
+ va_end(args);
+ return false;
+ }
+
setlocale(LC_NUMERIC, "C");
#endif
diff --git a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h
index 75cc68275bd..949ff66cefc 100644
--- a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h
+++ b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h
@@ -175,7 +175,7 @@ void ecpg_free(void *ptr);
bool ecpg_init(const struct connection *con,
const char *connection_name,
const int lineno);
-char *ecpg_strdup(const char *string, int lineno);
+char *ecpg_strdup(const char *string, int lineno, bool *alloc_failed);
const char *ecpg_type_name(enum ECPGttype typ);
int ecpg_dynamic_type(Oid type);
int sqlda_dynamic_type(Oid type, enum COMPAT_MODE compat);
diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c
index f52da06de9a..84a4a9fc578 100644
--- a/src/interfaces/ecpg/ecpglib/execute.c
+++ b/src/interfaces/ecpg/ecpglib/execute.c
@@ -860,9 +860,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari
numeric *nval;
if (var->arrsize > 1)
- mallocedval = ecpg_strdup("{", lineno);
+ mallocedval = ecpg_strdup("{", lineno, NULL);
else
- mallocedval = ecpg_strdup("", lineno);
+ mallocedval = ecpg_strdup("", lineno, NULL);
if (!mallocedval)
return false;
@@ -923,9 +923,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari
int slen;
if (var->arrsize > 1)
- mallocedval = ecpg_strdup("{", lineno);
+ mallocedval = ecpg_strdup("{", lineno, NULL);
else
- mallocedval = ecpg_strdup("", lineno);
+ mallocedval = ecpg_strdup("", lineno, NULL);
if (!mallocedval)
return false;
@@ -970,9 +970,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari
int slen;
if (var->arrsize > 1)
- mallocedval = ecpg_strdup("{", lineno);
+ mallocedval = ecpg_strdup("{", lineno, NULL);
else
- mallocedval = ecpg_strdup("", lineno);
+ mallocedval = ecpg_strdup("", lineno, NULL);
if (!mallocedval)
return false;
@@ -1017,9 +1017,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari
int slen;
if (var->arrsize > 1)
- mallocedval = ecpg_strdup("{", lineno);
+ mallocedval = ecpg_strdup("{", lineno, NULL);
else
- mallocedval = ecpg_strdup("", lineno);
+ mallocedval = ecpg_strdup("", lineno, NULL);
if (!mallocedval)
return false;
@@ -2001,7 +2001,8 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator,
return false;
}
#endif
- stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno);
+ stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno,
+ NULL);
if (stmt->oldlocale == NULL)
{
ecpg_do_epilogue(stmt);
@@ -2030,7 +2031,14 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator,
statement_type = ECPGst_execute;
}
else
- stmt->command = ecpg_strdup(query, lineno);
+ {
+ stmt->command = ecpg_strdup(query, lineno, NULL);
+ if (!stmt->command)
+ {
+ ecpg_do_epilogue(stmt);
+ return false;
+ }
+ }
stmt->name = NULL;
@@ -2042,7 +2050,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator,
if (command)
{
stmt->name = stmt->command;
- stmt->command = ecpg_strdup(command, lineno);
+ stmt->command = ecpg_strdup(command, lineno, NULL);
+ if (!stmt->command)
+ {
+ ecpg_do_epilogue(stmt);
+ return false;
+ }
}
else
{
@@ -2175,7 +2188,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator,
if (!is_prepared_name_set && stmt->statement_type == ECPGst_prepare)
{
- stmt->name = ecpg_strdup(var->value, lineno);
+ stmt->name = ecpg_strdup(var->value, lineno, NULL);
+ if (!stmt->name)
+ {
+ ecpg_do_epilogue(stmt);
+ return false;
+ }
is_prepared_name_set = true;
}
}
diff --git a/src/interfaces/ecpg/ecpglib/memory.c b/src/interfaces/ecpg/ecpglib/memory.c
index 6979be2c988..2112e55b6e4 100644
--- a/src/interfaces/ecpg/ecpglib/memory.c
+++ b/src/interfaces/ecpg/ecpglib/memory.c
@@ -43,8 +43,15 @@ ecpg_realloc(void *ptr, long size, int lineno)
return new;
}
+/*
+ * Wrapper for strdup(), with NULL in input treated as a correct case.
+ *
+ * "alloc_failed" can be optionally specified by the caller to check for
+ * allocation failures. The caller is responsible for its initialization,
+ * as ecpg_strdup() may be called repeatedly across multiple allocations.
+ */
char *
-ecpg_strdup(const char *string, int lineno)
+ecpg_strdup(const char *string, int lineno, bool *alloc_failed)
{
char *new;
@@ -54,6 +61,8 @@ ecpg_strdup(const char *string, int lineno)
new = strdup(string);
if (!new)
{
+ if (alloc_failed)
+ *alloc_failed = true;
ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL);
return NULL;
}
diff --git a/src/interfaces/ecpg/ecpglib/prepare.c b/src/interfaces/ecpg/ecpglib/prepare.c
index ea1146f520f..dd6fd1fe7f4 100644
--- a/src/interfaces/ecpg/ecpglib/prepare.c
+++ b/src/interfaces/ecpg/ecpglib/prepare.c
@@ -85,9 +85,22 @@ ecpg_register_prepared_stmt(struct statement *stmt)
/* create statement */
prep_stmt->lineno = lineno;
prep_stmt->connection = con;
- prep_stmt->command = ecpg_strdup(stmt->command, lineno);
+ prep_stmt->command = ecpg_strdup(stmt->command, lineno, NULL);
+ if (!prep_stmt->command)
+ {
+ ecpg_free(prep_stmt);
+ ecpg_free(this);
+ return false;
+ }
prep_stmt->inlist = prep_stmt->outlist = NULL;
- this->name = ecpg_strdup(stmt->name, lineno);
+ this->name = ecpg_strdup(stmt->name, lineno, NULL);
+ if (!this->name)
+ {
+ ecpg_free(prep_stmt->command);
+ ecpg_free(prep_stmt);
+ ecpg_free(this);
+ return false;
+ }
this->stmt = prep_stmt;
this->prepared = true;
@@ -177,14 +190,27 @@ prepare_common(int lineno, struct connection *con, const char *name, const char
/* create statement */
stmt->lineno = lineno;
stmt->connection = con;
- stmt->command = ecpg_strdup(variable, lineno);
+ stmt->command = ecpg_strdup(variable, lineno, NULL);
+ if (!stmt->command)
+ {
+ ecpg_free(stmt);
+ ecpg_free(this);
+ return false;
+ }
stmt->inlist = stmt->outlist = NULL;
/* if we have C variables in our statement replace them with '?' */
replace_variables(&(stmt->command), lineno);
/* add prepared statement to our list */
- this->name = ecpg_strdup(name, lineno);
+ this->name = ecpg_strdup(name, lineno, NULL);
+ if (!this->name)
+ {
+ ecpg_free(stmt->command);
+ ecpg_free(stmt);
+ ecpg_free(this);
+ return false;
+ }
this->stmt = stmt;
/* and finally really prepare the statement */
@@ -540,7 +566,9 @@ AddStmtToCache(int lineno, /* line # of statement */
/* add the query to the entry */
entry = &stmtCacheEntries[entNo];
entry->lineno = lineno;
- entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno);
+ entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno, NULL);
+ if (!entry->ecpgQuery)
+ return -1;
entry->connection = connection;
entry->execs = 0;
memcpy(entry->stmtID, stmtID, sizeof(entry->stmtID));
@@ -567,6 +595,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha
ecpg_log("ecpg_auto_prepare on line %d: statement found in cache; entry %d\n", lineno, entNo);
stmtID = stmtCacheEntries[entNo].stmtID;
+ *name = ecpg_strdup(stmtID, lineno, NULL);
+ if (*name == NULL)
+ return false;
con = ecpg_get_connection(connection_name);
prep = ecpg_find_prepared_statement(stmtID, con, NULL);
@@ -574,7 +605,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha
if (!prep && !prepare_common(lineno, con, stmtID, query))
return false;
- *name = ecpg_strdup(stmtID, lineno);
}
else
{
@@ -584,6 +614,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha
/* generate a statement ID */
sprintf(stmtID, "ecpg%d", nextStmtID++);
+ *name = ecpg_strdup(stmtID, lineno, NULL);
+ if (*name == NULL)
+ return false;
if (!ECPGprepare(lineno, connection_name, 0, stmtID, query))
return false;
@@ -591,8 +624,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha
entNo = AddStmtToCache(lineno, stmtID, connection_name, compat, query);
if (entNo < 0)
return false;
-
- *name = ecpg_strdup(stmtID, lineno);
}
/* increase usage counter */
diff --git a/src/port/pgmkdirp.c b/src/port/pgmkdirp.c
index d943559760d..7d7cea4dd0e 100644
--- a/src/port/pgmkdirp.c
+++ b/src/port/pgmkdirp.c
@@ -73,7 +73,7 @@ pg_mkdir_p(char *path, int omode)
if (p[0] == '/' && p[1] == '/')
{
/* network drive */
- p = strstr(p + 2, "/");
+ p = strchr(p + 2, '/');
if (p == NULL)
{
errno = EINVAL;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 529b2241731..a98c97f7616 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00012345
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345
(1 row)
-- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000
(1 row)
-- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,34 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - retain_dead_tuples must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo);
+ERROR: retain_dead_tuples requires a Boolean value
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 007c9e70374..f0f714fe747 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -287,6 +287,17 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - retain_dead_tuples must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo);
+
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
-- let's do some tests with pg_create_subscription rather than superuser
SET SESSION AUTHORIZATION regress_subscription_user3;
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index d78a6bac16a..7458d7fba7e 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -1,6 +1,6 @@
# Copyright (c) 2025, PostgreSQL Global Development Group
-# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+# Test conflicts in logical replication
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
@@ -18,7 +18,7 @@ $node_publisher->start;
# Create a subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init;
+$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create a table on publisher
@@ -146,4 +146,195 @@ $node_subscriber->wait_for_log(
pass('multiple_unique_conflicts detected on a leaf partition during insert');
+###############################################################################
+# Setup a bidirectional logical replication between node_A & node_B
+###############################################################################
+
+# Initialize nodes.
+
+# node_A. Increase the log_min_messages setting to DEBUG2 to debug test
+# failures. Disable autovacuum to avoid generating xid that could affect the
+# replication slot's xmin value.
+my $node_A = $node_publisher;
+$node_A->append_conf(
+ 'postgresql.conf',
+ qq{autovacuum = off
+ log_min_messages = 'debug2'});
+$node_A->restart;
+
+# node_B
+my $node_B = $node_subscriber;
+$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on");
+$node_B->restart;
+
+# Create table on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)");
+
+# Create the same table on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)");
+
+my $subname_AB = 'tap_sub_a_b';
+my $subname_BA = 'tap_sub_b_a';
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
+$node_B->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION $subname_BA
+ CONNECTION '$node_A_connstr application_name=$subname_BA'
+ PUBLICATION tap_pub_A
+ WITH (origin = none, retain_dead_tuples = true)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
+$node_A->safe_psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION $subname_AB
+ CONNECTION '$node_B_connstr application_name=$subname_AB'
+ PUBLICATION tap_pub_B
+ WITH (origin = none, copy_data = off)");
+
+# Wait for initial table sync to finish
+$node_A->wait_for_subscription_sync($node_B, $subname_AB);
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
+
+is(1, 1, 'Bidirectional replication setup is complete');
+
+# Confirm that the conflict detection slot is created on Node B and the xmin
+# value is valid.
+ok( $node_B->poll_query_until(
+ 'postgres',
+ "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is valid on Node B");
+
+##################################################
+# Check that the retain_dead_tuples option can be enabled only for disabled
+# subscriptions. Validate the NOTICE message during the subscription DDL, and
+# ensure the conflict detection slot is created upon enabling the
+# retain_dead_tuples option.
+##################################################
+
+# Alter retain_dead_tuples for enabled subscription
+my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true)");
+ok( $stderr =~
+ /ERROR: cannot set option \"retain_dead_tuples\" for enabled subscription/,
+ "altering retain_dead_tuples is not allowed for enabled subscription");
+
+# Disable the subscription
+$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;");
+
+# Enable retain_dead_tuples for disabled subscription
+($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);");
+ok( $stderr =~
+ /NOTICE: deleted rows to detect conflicts would not be removed until the subscription is enabled/,
+ "altering retain_dead_tuples is allowed for disabled subscription");
+
+# Re-enable the subscription
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+
+# Confirm that the conflict detection slot is created on Node A and the xmin
+# value is valid.
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is valid on Node A");
+
+##################################################
+# Check the WARNING when changing the origin to ANY, if retain_dead_tuples is
+# enabled. This warns of the possibility of receiving changes from origins
+# other than the publisher.
+##################################################
+
+($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (origin = any);");
+ok( $stderr =~
+ /WARNING: subscription "tap_sub_a_b" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins/,
+ "warn of the possibility of receiving changes from origins other than the publisher");
+
+# Reset the origin to none
+$node_A->psql('postgres',
+ "ALTER SUBSCRIPTION $subname_AB SET (origin = none);");
+
+###############################################################################
+# Check that dead tuples on node A cannot be cleaned by VACUUM until the
+# concurrent transactions on Node B have been applied and flushed on Node A.
+###############################################################################
+
+# Insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);");
+$node_A->wait_for_catchup($subname_BA);
+
+my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;");
+is($result, qq(1|1
+2|2), 'check replicated insert on node B');
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
+
+($cmdret, $stdout, $stderr) = $node_A->psql(
+ 'postgres', qq(VACUUM (verbose) public.tab;)
+);
+
+ok( $stderr =~
+ qr/1 are dead but not yet removable/,
+ 'the deleted column is non-removable');
+
+$node_A->safe_psql(
+ 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+# Remember the next transaction ID to be assigned
+my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+# Confirm that the xmin value is advanced to the latest nextXid. If no
+# transactions are running, the apply worker selects nextXid as the candidate
+# for the non-removable xid. See GetOldestActiveTransactionId().
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the xmin value of slot 'pg_conflict_detection' is updated on Node A");
+
+# Confirm that the dead tuple can be removed now
+($cmdret, $stdout, $stderr) = $node_A->psql(
+ 'postgres', qq(VACUUM (verbose) public.tab;)
+);
+
+ok( $stderr =~
+ qr/1 removed, 1 remain, 0 are dead but not yet removable/,
+ 'the deleted column is removed');
+
+###############################################################################
+# Check that the replication slot pg_conflict_detection is dropped after
+# removing all the subscriptions.
+###############################################################################
+
+$node_B->safe_psql(
+ 'postgres', "DROP SUBSCRIPTION $subname_BA");
+
+ok( $node_B->poll_query_until(
+ 'postgres',
+ "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the slot 'pg_conflict_detection' has been dropped on Node B");
+
+$node_A->safe_psql(
+ 'postgres', "DROP SUBSCRIPTION $subname_AB");
+
+ok( $node_A->poll_query_until(
+ 'postgres',
+ "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+ ),
+ "the slot 'pg_conflict_detection' has been dropped on Node A");
+
done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index cd897467088..a8656419cb6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2566,6 +2566,8 @@ RestrictInfo
Result
ResultRelInfo
ResultState
+RetainDeadTuplesData
+RetainDeadTuplesPhase
ReturnSetInfo
ReturnStmt
ReturningClause