diff options
86 files changed, 2973 insertions, 517 deletions
diff --git a/contrib/basic_archive/basic_archive.c b/contrib/basic_archive/basic_archive.c index 4a8b8c7ac29..8fc633d2cbf 100644 --- a/contrib/basic_archive/basic_archive.c +++ b/contrib/basic_archive/basic_archive.c @@ -65,7 +65,7 @@ void _PG_init(void) { DefineCustomStringVariable("basic_archive.archive_directory", - gettext_noop("Archive file destination directory."), + "Archive file destination directory.", NULL, &archive_directory, "", diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 8a0b112a7ff..de5bed282f3 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -240,6 +240,10 @@ dblink_get_conn(char *conname_or_str, errmsg("could not establish connection"), errdetail_internal("%s", msg))); } + + PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, + "received message via remote connection"); + dblink_security_check(conn, NULL, connstr); if (PQclientEncoding(conn) != GetDatabaseEncoding()) PQsetClientEncoding(conn, GetDatabaseEncodingName()); @@ -338,6 +342,9 @@ dblink_connect(PG_FUNCTION_ARGS) errdetail_internal("%s", msg))); } + PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, + "received message via remote connection"); + /* check password actually used if not superuser */ dblink_security_check(conn, connname, connstr); diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 304f3c20f83..c1ce6f33436 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -625,6 +625,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user) server->servername), errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + PQsetNoticeReceiver(conn, libpqsrv_notice_receiver, + "received message via remote connection"); + /* Perform post-connection security checks. */ pgfdw_security_check(keywords, values, user, conn); diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index ff2b30cc912..4b6e49a5d95 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -710,12 +710,12 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c1 = -c1; -- Op Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" = (- "C 1"))) (3 rows) -EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE (c1 IS NOT NULL) IS DISTINCT FROM (c1 IS NOT NULL); -- DistinctExpr - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------- +EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE (c3 IS NOT NULL) IS DISTINCT FROM (c3 IS NOT NULL); -- DistinctExpr + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------- Foreign Scan on public.ft1 t1 Output: c1, c2, c3, c4, c5, c6, c7, c8 - Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((("C 1" IS NOT NULL) IS DISTINCT FROM ("C 1" IS NOT NULL))) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (((c3 IS NOT NULL) IS DISTINCT FROM (c3 IS NOT NULL))) (3 rows) EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c1 = ANY(ARRAY[c2, 1, c1 + 0]); -- ScalarArrayOpExpr diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 7267732f569..31b6c685b55 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -352,7 +352,7 @@ EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c3 IS NULL; -- Nu EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c3 IS NOT NULL; -- NullTest EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE round(abs(c1), 0) = 1; -- FuncExpr EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c1 = -c1; -- OpExpr(l) -EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE (c1 IS NOT NULL) IS DISTINCT FROM (c1 IS NOT NULL); -- DistinctExpr +EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE (c3 IS NOT NULL) IS DISTINCT FROM (c3 IS NOT NULL); -- DistinctExpr EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c1 = ANY(ARRAY[c2, 1, c1 + 0]); -- ScalarArrayOpExpr EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c1 = (ARRAY[c1,c2,3])[1]; -- SubscriptingRef EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft1 t1 WHERE c6 = E'foo''s\\bar'; -- check special chars diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 0d23bc1b122..97f547b3cc4 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8084,6 +8084,17 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <row> <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>subretaindeadtuples</structfield> <type>bool</type> + </para> + <para> + If true, the information (e.g., dead tuples, commit timestamps, and + origins) on the subscriber that is useful for conflict detection is + retained. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> <structfield>subconninfo</structfield> <type>text</type> </para> <para> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index c7acc0f182f..20ccb2d6b54 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4965,6 +4965,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" new setting. This setting has no effect if <varname>primary_conninfo</varname> is not set or the server is not in standby mode. + The name cannot be <literal>pg_conflict_detection</literal> as it is + reserved for the conflict detection slot. </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index f5a0e0954a1..de5b5929ee0 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29592,7 +29592,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset </para> <para> Creates a new physical replication slot named - <parameter>slot_name</parameter>. The optional second parameter, + <parameter>slot_name</parameter>. The name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for the + conflict detection slot. The optional second parameter, when <literal>true</literal>, specifies that the <acronym>LSN</acronym> for this replication slot be reserved immediately; otherwise the <acronym>LSN</acronym> is reserved on first connection from a streaming @@ -29636,7 +29638,9 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <para> Creates a new logical (decoding) replication slot named <parameter>slot_name</parameter> using the output plugin - <parameter>plugin</parameter>. The optional third + <parameter>plugin</parameter>. The name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for + the conflict detection slot. The optional third parameter, <parameter>temporary</parameter>, when set to true, specifies that the slot should not be permanently stored to disk and is only meant for use by the current session. Temporary slots are also @@ -29666,6 +29670,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <para> Copies an existing physical replication slot named <parameter>src_slot_name</parameter> to a physical replication slot named <parameter>dst_slot_name</parameter>. + The new slot name cannot be <literal>pg_conflict_detection</literal>, + as it is reserved for the conflict detection. The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the source slot. <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter> @@ -29688,8 +29694,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset Copies an existing logical replication slot named <parameter>src_slot_name</parameter> to a logical replication slot named <parameter>dst_slot_name</parameter>, optionally changing - the output plugin and persistence. The copied logical slot starts - from the same <acronym>LSN</acronym> as the source logical slot. Both + the output plugin and persistence. The new slot name cannot be + <literal>pg_conflict_detection</literal> as it is reserved for + the conflict detection. The copied logical slot starts from the same + <acronym>LSN</acronym> as the source logical slot. Both <parameter>temporary</parameter> and <parameter>plugin</parameter> are optional; if they are omitted, the values of the source slot are used. The <literal>failover</literal> option of the source logical slot diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index 1aa4741a8ea..63d7e376f19 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -147,7 +147,7 @@ typedef struct IndexAmRoutine ambuild_function ambuild; ambuildempty_function ambuildempty; aminsert_function aminsert; - aminsertcleanup_function aminsertcleanup; + aminsertcleanup_function aminsertcleanup; /* can be NULL */ ambulkdelete_function ambulkdelete; amvacuumcleanup_function amvacuumcleanup; amcanreturn_function amcanreturn; /* can be NULL */ diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index e26f7f59d4a..fcac55aefe6 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1048,28 +1048,28 @@ HINT: To initiate replication, you must manually create the replication slot, e defined) for each publication. <programlisting><![CDATA[ /* pub # */ \dRp+ - Publication p1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text)) + "public.t1" WHERE ((a > 5) AND (c = 'NSW'::text)) - Publication p2 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p2 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t1" - "public.t2" WHERE (e = 99) + "public.t1" + "public.t2" WHERE (e = 99) - Publication p3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: - "public.t2" WHERE (d = 10) - "public.t3" WHERE (g = 10) + "public.t2" WHERE (d = 10) + "public.t3" WHERE (g = 10) ]]></programlisting></para> <para> @@ -1491,10 +1491,10 @@ Publications: for each publication. <programlisting> /* pub # */ \dRp+ - Publication p1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +----------+------------+---------+---------+---------+-----------+-------------------+---------- + postgres | f | t | t | t | t | none | f Tables: "public.t1" (id, a, b, d) </programlisting></para> @@ -2397,6 +2397,12 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER </para> <para> + <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link> + must be set to at least 1 when <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + is enabled for any subscription. + </para> + + <para> <link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link> must be set to at least the number of subscriptions (for leader apply workers), plus some reserve for the table synchronization workers and @@ -2532,6 +2538,22 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER dependencies on clusters before version 17.0 will silently be ignored. </para> + <note> + <para> + Commit timestamps and origin data are not preserved during the upgrade. + As a result, even if + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + is enabled, the upgraded subscriber may be unable to detect conflicts or + log relevant commit timestamps and origins when applying changes from the + publisher occurred before the upgrade. Additionally, immediately after the + upgrade, the vacuum may remove the deleted rows that are required for + conflict detection. This can affect the changes that were not replicated + before the upgrade. To ensure consistent conflict tracking, users should + ensure that all potentially conflicting changes are replicated to the + subscriber before initiating the upgrade. + </para> + </note> + <para> There are some prerequisites for <application>pg_upgrade</application> to be able to upgrade the subscriptions. If these are not met an error @@ -2563,6 +2585,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER subscriptions present in the old cluster. </para> </listitem> + <listitem> + <para> + If there are subscriptions with retain_dead_tuples enabled, the reserved + replication slot <quote><literal>pg_conflict_detection</literal></quote> + must not exist on the new cluster. Additionally, the + <link linkend="guc-wal-level"><varname>wal_level</varname></link> on the + new cluster must be set to <literal>replica</literal> or + <literal>logical</literal>. + </para> + </listitem> </itemizedlist> </sect2> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index e74b5be1eff..b115884acb3 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2235,6 +2235,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" <para> The name of the slot to create. Must be a valid replication slot name (see <xref linkend="streaming-replication-slots-manipulation"/>). + The name cannot be <literal>pg_conflict_detection</literal> as it + is reserved for the conflict detection. </para> </listitem> </varlistentry> @@ -2653,6 +2655,65 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> </listitem> </varlistentry> + + <varlistentry id="protocol-replication-primary-status-update"> + <term>Primary status update (B)</term> + <listitem> + <variablelist> + <varlistentry> + <term>Byte1('s')</term> + <listitem> + <para> + Identifies the message as a primary status update. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The latest WAL write position on the server. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The oldest transaction ID that is currently in the commit phase on + the server, along with its epoch. The most significant 32 bits are + the epoch. The least significant 32 bits are the transaction ID. + If no transactions are active on the server, this number will be + the next transaction ID to be assigned. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The next transaction ID to be assigned on the server, along with + its epoch. The most significant 32 bits are the epoch. The least + significant 32 bits are the transaction ID. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> </variablelist> <para> @@ -2797,6 +2858,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" </variablelist> </listitem> </varlistentry> + + <varlistentry id="protocol-replication-standby-wal-status-request"> + <term>Request primary status update (F)</term> + <listitem> + <variablelist> + <varlistentry> + <term>Byte1('p')</term> + <listitem> + <para> + Identifies the message as a request for a primary status update. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term>Int64</term> + <listitem> + <para> + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index fdc648d007f..d48cdc76bd3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -235,8 +235,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>, <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, - <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and - <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>. + <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, + <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>. Only a superuser can set <literal>password_required = false</literal>. </para> @@ -261,8 +262,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < </para> <para> - The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link> - and <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link> + The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, + <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and + <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> parameters can only be altered when the subscription is disabled. </para> @@ -285,6 +287,14 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO < option is changed from <literal>true</literal> to <literal>false</literal>, the publisher will replicate the transactions again when they are committed. </para> + + <para> + If the <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link> + option is altered to <literal>false</literal> and no other subscription + has this option enabled, the replication slot named + <quote><literal>pg_conflict_detection</literal></quote>, created to retain + dead tuples for conflict detection, will be dropped. + </para> </listitem> </varlistentry> diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..b8cd15f3280 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -169,7 +169,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl <listitem> <para> Name of the publisher's replication slot to use. The default is - to use the name of the subscription for the slot name. + to use the name of the subscription for the slot name. The name cannot + be <literal>pg_conflict_detection</literal> as it is reserved for the + conflict detection. </para> <para> @@ -435,6 +437,89 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl </para> </listitem> </varlistentry> + + <varlistentry id="sql-createsubscription-params-with-retain-dead-tuples"> + <term><literal>retain_dead_tuples</literal> (<type>boolean</type>)</term> + <listitem> + <para> + Specifies whether the information (e.g., dead tuples, commit + timestamps, and origins) required for conflict detection on the + subscriber is retained. The default is <literal>false</literal>. + If set to <literal>true</literal>, a physical replication slot named + <quote><literal>pg_conflict_detection</literal></quote> will be + created on the subscriber to prevent the conflict information from + being removed. + </para> + + <para> + Note that the information useful for conflict detection is retained + only after the creation of the slot. You can verify the existence of + this slot by querying <link linkend="view-pg-replication-slots">pg_replication_slots</link>. + And even if multiple subscriptions on one node enable this option, + only one replication slot will be created. Also, + <varname>wal_level</varname> must be set to <literal>replica</literal> + or higher to allow the replication slot to be used. + </para> + + <caution> + <para> + Note that the information for conflict detection cannot be purged if + the subscription is disabled; thus, the information will accumulate + until the subscription is enabled. To prevent excessive accumulation, + it is recommended to disable <literal>retain_dead_tuples</literal> + if the subscription will be inactive for an extended period. + </para> + + <para> + Additionally when enabling <literal>retain_dead_tuples</literal> for + conflict detection in logical replication, it is important to design the + replication topology to balance data retention requirements with + overall system performance. This option provides minimal performance + overhead when applied appropriately. The following scenarios illustrate + effective usage patterns when enabling this option. + </para> + + <para> + a. Large Tables with Bidirectional Writes: + For large tables subject to concurrent writes on both publisher and + subscriber nodes, publishers can define row filters when creating + publications to segment data. This allows multiple subscriptions + to replicate exclusive subsets of the table in parallel, optimizing + the throughput. + </para> + + <para> + b. Write-Enabled Subscribers: + If a subscriber node is expected to perform write operations, replication + can be structured using multiple publications and subscriptions. By + distributing tables across these publications, the workload is spread among + several apply workers, improving concurrency and reducing contention. + </para> + + <para> + c. Read-Only Subscribers: + In configurations involving single or multiple publisher nodes + performing concurrent write operations, read-only subscriber nodes may + replicate changes without seeing a performance impact if it does index + scan. However, if the subscriber is impacted due to replication lag or + scan performance (say due to sequential scans), it needs to follow one + of the two previous strategies to distribute the workload on the + subscriber. + </para> + </caution> + + <para> + This option cannot be enabled if the publisher is a physical standby. + </para> + + <para> + Enabling this option ensures retention of information useful for + conflict detection solely for changes occurring locally on the + publisher. For the changes originating from different origins, + reliable conflict detection cannot be guaranteed. + </para> + </listitem> + </varlistentry> </variablelist></para> </listitem> diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 85cbe397cb2..7918176fc58 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact) * starting immediately after the WAL record is inserted could complete * without fsync'ing our state file. (This is essentially the same kind * of race condition as the COMMIT-to-clog-write case that - * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in + * the critical commit section. We need to know about such transactions + * for conflict detection in logical replication. See + * GetOldestActiveTransactionId(true, false) and its use. * * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. @@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid, * RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit (q.v. if you change - * this function): in particular, we must set DELAY_CHKPT_START to avoid a + * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a * race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), @@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; - TimestampTz committs = GetCurrentTimestamp(); + TimestampTz committs; bool replorigin; /* @@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid, START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + /* + * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before + * commit time is written. + */ + pg_write_barrier(); + + /* + * Note it is important to set committs value after marking ourselves as + * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because + * we want to ensure all transactions that have acquired commit timestamp + * are finished before we allow the logical replication client to advance + * its xid which is used to hold back dead rows for conflict detection. + * See comments atop worker.c. + */ + committs = GetCurrentTimestamp(); /* * Emit the XLOG commit record. Note that we mark 2PC commits as @@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionIdCommitTree(xid, nchildren, children); /* Checkpoint can proceed now */ - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 41601fcb280..b46e7e9c2a6 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1431,10 +1431,22 @@ RecordTransactionCommit(void) * without holding the ProcArrayLock, since we're the only one * modifying it. This makes checkpoint's determination of which xacts * are delaying the checkpoint a bit fuzzy, but it doesn't matter. + * + * Note, it is important to get the commit timestamp after marking the + * transaction in the commit critical section. See + * RecordTransactionCommitPrepared. */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); START_CRIT_SECTION(); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + Assert(xactStopTimestamp == 0); + + /* + * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible + * before commit time is written. + */ + pg_write_barrier(); /* * Insert the commit XLOG record. @@ -1537,7 +1549,7 @@ RecordTransactionCommit(void) */ if (markXidCommitted) { - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8e7827c6ed9..eefffc4277a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7121,7 +7121,7 @@ CreateCheckPoint(int flags) * starting snapshot of locks and transactions. */ if (!shutdown && XLogStandbyInfoActive()) - checkPoint.oldestActiveXid = GetOldestActiveTransactionId(); + checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true); else checkPoint.oldestActiveXid = InvalidTransactionId; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 23878b2dd91..e8f3ba00caa 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4760,7 +4760,7 @@ bool check_primary_slot_name(char **newval, void **extra, GucSource source) { if (*newval && strcmp(*newval, "") != 0 && - !ReplicationSlotValidateName(*newval, WARNING)) + !ReplicationSlotValidateName(*newval, false, WARNING)) return false; return true; diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032413e..63c2992d19f 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; sub->failover = subform->subfailover; + sub->retaindeadtuples = subform->subretaindeadtuples; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b2d5332effc..f6eca09ee15 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1386,7 +1386,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, - subslotname, subsynccommit, subpublications, suborigin) + subretaindeadtuples, subslotname, subsynccommit, + subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e23b0de7242..cd6c3684482 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -14,6 +14,7 @@ #include "postgres.h" +#include "access/commit_ts.h" #include "access/htup_details.h" #include "access/table.h" #include "access/twophase.h" @@ -71,8 +72,9 @@ #define SUBOPT_PASSWORD_REQUIRED 0x00000800 #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_FAILOVER 0x00002000 -#define SUBOPT_LSN 0x00004000 -#define SUBOPT_ORIGIN 0x00008000 +#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000 +#define SUBOPT_LSN 0x00008000 +#define SUBOPT_ORIGIN 0x00010000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -98,6 +100,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; bool failover; + bool retaindeadtuples; char *origin; XLogRecPtr lsn; } SubOpts; @@ -105,8 +108,10 @@ typedef struct SubOpts static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); static void check_publications_origin(WalReceiverConn *wrconn, List *publications, bool copydata, - char *origin, Oid *subrel_local_oids, - int subrel_count, char *subname); + bool retain_dead_tuples, char *origin, + Oid *subrel_local_oids, int subrel_count, + char *subname); +static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_FAILOVER)) opts->failover = false; + if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + opts->retaindeadtuples = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) @@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_FAILOVER; opts->failover = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) && + strcmp(defel->defname, "retain_dead_tuples") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES; + opts->retaindeadtuples = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, stmt->subname))); } + /* Ensure that we can enable retain_dead_tuples */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !opts.enabled, WARNING); + if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) && opts.slot_name == NULL) opts.slot_name = stmt->subname; @@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover); + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, - opts.origin, NULL, 0, stmt->subname); + opts.retaindeadtuples, opts.origin, + NULL, 0, stmt->subname); + + if (opts.retaindeadtuples) + check_pub_dead_tuple_retention(wrconn); /* * Set sync state based on if we were asked to do data copy or @@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); check_publications_origin(wrconn, sub->publications, copy_data, - sub->origin, subrel_local_oids, - subrel_count, sub->name); + sub->retaindeadtuples, sub->origin, + subrel_local_oids, subrel_count, sub->name); /* * Rels that we want to remove from subscription and drop any slots @@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, } /* - * Common checks for altering failover and two_phase options. + * Common checks for altering failover, two_phase, and retain_dead_tuples + * options. */ static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel) { + Assert(strcmp(option, "failover") == 0 || + strcmp(option, "two_phase") == 0 || + strcmp(option, "retain_dead_tuples") == 0); + /* - * The checks in this function are required only for failover and - * two_phase options. + * Altering the retain_dead_tuples option does not update the slot on the + * publisher. */ - Assert(strcmp(option, "failover") == 0 || - strcmp(option, "two_phase") == 0); + Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0); /* * Do not allow changing the option if the subscription is enabled. This @@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option, * the publisher by the existing walsender, so we could have allowed that * even when the subscription is enabled. But we kept this restriction for * the sake of consistency and simplicity. + * + * Additionally, do not allow changing the retain_dead_tuples option when + * the subscription is enabled to prevent race conditions arising from the + * new option value being acknowledged asynchronously by the launcher and + * apply workers. + * + * Without the restriction, a race condition may arise when a user + * disables and immediately re-enables the retain_dead_tuples option. In + * this case, the launcher might drop the slot upon noticing the disabled + * action, while the apply worker may keep maintaining + * oldest_nonremovable_xid without noticing the option change. During this + * period, a transaction ID wraparound could falsely make this ID appear + * as if it originates from the future w.r.t the transaction ID stored in + * the slot maintained by launcher. + * + * Similarly, if the user enables retain_dead_tuples concurrently with the + * launcher starting the worker, the apply worker may start calculating + * oldest_nonremovable_xid before the launcher notices the enable action. + * Consequently, the launcher may update slot.xmin to a newer value than + * that maintained by the worker. In subsequent cycles, upon integrating + * the worker's oldest_nonremovable_xid, the launcher might detect a + * retreat in the calculated xmin, necessitating additional handling. + * + * XXX To address the above race conditions, we can define + * oldest_nonremovable_xid as FullTransactionID and adds the check to + * disallow retreating the conflict slot's xmin. For now, we kept the + * implementation simple by disallowing change to the retain_dead_tuples, + * but in the future we can change this after some more analysis. + * + * Note that we could restrict only the enabling of retain_dead_tuples to + * avoid the race conditions described above, but we maintain the + * restriction for both enable and disable operations for the sake of + * consistency. */ if (sub->enabled) ereport(ERROR, @@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool update_tuple = false; bool update_failover = false; bool update_two_phase = false; + bool check_pub_rdt = false; + bool retain_dead_tuples; + char *origin; Subscription *sub; Form_pg_subscription form; bits32 supported_opts; @@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, sub = GetSubscription(subid, false); + retain_dead_tuples = sub->retaindeadtuples; + origin = sub->origin; + /* * Don't allow non-superuser modification of a subscription with * password_required=false. @@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | - SUBOPT_ORIGIN); + SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subfailover - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES)) + { + values[Anum_pg_subscription_subretaindeadtuples - 1] = + BoolGetDatum(opts.retaindeadtuples); + replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true; + + CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel); + + /* + * Workers may continue running even after the + * subscription has been disabled. + * + * To prevent race conditions (as described in + * CheckAlterSubOption()), ensure that all worker + * processes have already exited before proceeding. + */ + if (logicalrep_workers_find(subid, true, true)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"), + errhint("Try again after some time."))); + + /* + * Remind the user that enabling subscription will prevent + * the accumulation of dead tuples. + */ + if (opts.retaindeadtuples) + CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE); + + /* + * Notify the launcher to manage the replication slot for + * conflict detection. This ensures that replication slot + * is efficiently handled (created, updated, or dropped) + * in response to any configuration changes. + */ + ApplyLauncherWakeupAtCommit(); + + check_pub_rdt = opts.retaindeadtuples; + retain_dead_tuples = opts.retaindeadtuples; + } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) { values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); replaces[Anum_pg_subscription_suborigin - 1] = true; + + /* + * Check if changes from different origins may be received + * from the publisher when the origin is changed to ANY + * and retain_dead_tuples is enabled. + */ + check_pub_rdt = retain_dead_tuples && + pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0; + + origin = opts.origin; } update_tuple = true; @@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot enable subscription that does not have a slot name"))); + /* + * Check track_commit_timestamp only when enabling the + * subscription in case it was disabled after creation. See + * comments atop CheckSubDeadTupleRetention() for details. + */ + if (sub->retaindeadtuples) + CheckSubDeadTupleRetention(opts.enabled, !opts.enabled, + WARNING); + values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); replaces[Anum_pg_subscription_subenabled - 1] = true; @@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, ApplyLauncherWakeupAtCommit(); update_tuple = true; + + /* + * The subscription might be initially created with + * connect=false and retain_dead_tuples=true, meaning the + * remote server's status may not be checked. Ensure this + * check is conducted now. + */ + check_pub_rdt = sub->retaindeadtuples && opts.enabled; break; } @@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, CStringGetTextDatum(stmt->conninfo); replaces[Anum_pg_subscription_subconninfo - 1] = true; update_tuple = true; + + /* + * Since the remote server configuration might have changed, + * perform a check to ensure it permits enabling + * retain_dead_tuples. + */ + check_pub_rdt = sub->retaindeadtuples; break; case ALTER_SUBSCRIPTION_SET_PUBLICATION: @@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering the slot, if - * needed. + * Try to acquire the connection necessary either for modifying the slot + * or for checking if the remote server permits enabling + * retain_dead_tuples. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (update_failover || update_two_phase) + if (update_failover || update_two_phase || check_pub_rdt) { bool must_use_password; char *err; @@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); - /* Try to connect to the publisher. */ + /* + * Try to connect to the publisher, using the new connection string if + * available. + */ must_use_password = sub->passwordrequired && !sub->ownersuperuser; - wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password, - sub->name, &err); + wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo, + true, true, must_use_password, sub->name, + &err); if (!wrconn) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, - update_failover ? &opts.failover : NULL, - update_two_phase ? &opts.twophase : NULL); + if (retain_dead_tuples) + check_pub_dead_tuple_retention(wrconn); + + check_publications_origin(wrconn, sub->publications, false, + retain_dead_tuples, origin, NULL, 0, + sub->name); + + if (update_failover || update_two_phase) + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { @@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) * Check and log a warning if the publisher has subscribed to the same table, * its partition ancestors (if it's a partition), or its partition children (if * it's a partitioned table), from some other publishers. This check is - * required only if "copy_data = true" and "origin = none" for CREATE - * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the - * user that data having origin might have been copied. + * required in the following scenarios: * - * This check need not be performed on the tables that are already added - * because incremental sync for those tables will happen through WAL and the - * origin of the data can be identified from the WAL records. + * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "copy_data = true" and "origin = none": + * - Warn the user that data with an origin might have been copied. + * - This check is skipped for tables already added, as incremental sync via + * WAL allows origin tracking. The list of such tables is in + * subrel_local_oids. * - * subrel_local_oids contains the list of relation oids that are already - * present on the subscriber. + * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements + * with "retain_dead_tuples = true" and "origin = any", and for ALTER + * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or + * when the publisher's status changes (e.g., due to a connection string + * update): + * - Warn the user that only conflict detection info for local changes on + * the publisher is retained. Data from other origins may lack sufficient + * details for reliable conflict detection. + * - See comments atop worker.c for more details. */ static void check_publications_origin(WalReceiverConn *wrconn, List *publications, - bool copydata, char *origin, Oid *subrel_local_oids, + bool copydata, bool retain_dead_tuples, + char *origin, Oid *subrel_local_oids, int subrel_count, char *subname) { WalRcvExecResult *res; @@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, Oid tableRow[1] = {TEXTOID}; List *publist = NIL; int i; + bool check_rdt; + bool check_table_sync; + bool origin_none = origin && + pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0; + + /* + * Enable retain_dead_tuples checks only when origin is set to 'any', + * since with origin='none' only local changes are replicated to the + * subscriber. + */ + check_rdt = retain_dead_tuples && !origin_none; + + /* + * Enable table synchronization checks only when origin is 'none', to + * ensure that data from other origins is not inadvertently copied. + */ + check_table_sync = copydata && origin_none; - if (!copydata || !origin || - (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + /* retain_dead_tuples and table sync checks occur separately */ + Assert(!(check_rdt && check_table_sync)); + + /* Return if no checks are required */ + if (!check_rdt && !check_table_sync) return; initStringInfo(&cmd); @@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, /* * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains * the list of relation oids that are already present on the subscriber. - * This check should be skipped for these tables. + * This check should be skipped for these tables if checking for table + * sync scenario. However, when handling the retain_dead_tuples scenario, + * ensure all tables are checked, as some existing tables may now include + * changes from other origins due to newly created subscriptions on the + * publisher. */ - for (i = 0; i < subrel_count; i++) + if (check_table_sync) { - Oid relid = subrel_local_oids[i]; - char *schemaname = get_namespace_name(get_rel_namespace(relid)); - char *tablename = get_rel_name(relid); + for (i = 0; i < subrel_count; i++) + { + Oid relid = subrel_local_oids[i]; + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); - appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", - schemaname, tablename); + appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", + schemaname, tablename); + } } res = walrcv_exec(wrconn, cmd.data, 1, tableRow); @@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, * XXX: For simplicity, we don't check whether the table has any data or * not. If the table doesn't have any data then we don't need to * distinguish between data having origin and data not having origin so we - * can avoid logging a warning in that case. + * can avoid logging a warning for table sync scenario. */ if (publist) { StringInfo pubnames = makeStringInfo(); + StringInfo err_msg = makeStringInfo(); + StringInfo err_hint = makeStringInfo(); /* Prepare the list of publication(s) for warning message. */ GetPublicationsStr(publist, pubnames, false); + + if (check_table_sync) + { + appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"), + subname); + appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins.")); + } + else + { + appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"), + subname); + appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples.")); + } + ereport(WARNING, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin", - subname), - errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", - "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.", + errmsg_internal("%s", err_msg->data), + errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.", + "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.", list_length(publist), pubnames->data), - errhint("Verify that initial data copied from the publisher tables did not come from other origins.")); + errhint_internal("%s", err_hint->data)); } ExecDropSingleTupleTableSlot(slot); @@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, } /* + * Determine whether the retain_dead_tuples can be enabled based on the + * publisher's status. + * + * This option is disallowed if the publisher is running a version earlier + * than the PG19, or if the publisher is in recovery (i.e., it is a standby + * server). + * + * See comments atop worker.c for a detailed explanation. + */ +static void +check_pub_dead_tuple_retention(WalReceiverConn *wrconn) +{ + WalRcvExecResult *res; + Oid RecoveryRow[1] = {BOOLOID}; + TupleTableSlot *slot; + bool isnull; + bool remote_in_recovery; + + if (walrcv_server_version(wrconn) < 19000) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19")); + + res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not obtain recovery progress from the publisher: %s", + res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + elog(ERROR, "failed to fetch tuple for the recovery progress"); + + remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull)); + + if (remote_in_recovery) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable retain_dead_tuples if the publisher is in recovery.")); + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + +/* + * Check if the subscriber's configuration is adequate to enable the + * retain_dead_tuples option. + * + * Issue an ERROR if the wal_level does not support the use of replication + * slots when check_guc is set to true. + * + * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is + * set to true. This is only to highlight the importance of enabling + * track_commit_timestamp instead of catching all the misconfigurations, as + * this setting can be adjusted after subscription creation. Without it, the + * apply worker will simply skip conflict detection. + * + * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an + * ERROR since users can only modify retain_dead_tuples for disabled + * subscriptions. And as long as the subscription is enabled promptly, it will + * not pose issues. + */ +void +CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, + int elevel_for_sub_disabled) +{ + Assert(elevel_for_sub_disabled == NOTICE || + elevel_for_sub_disabled == WARNING); + + if (check_guc && wal_level < WAL_LEVEL_REPLICA) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"), + errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start.")); + + if (check_guc && !track_commit_timestamp) + ereport(WARNING, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"), + errhint("Consider setting \"%s\" to true.", + "track_commit_timestamp")); + + if (sub_disabled) + ereport(elevel_for_sub_disabled, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"), + (elevel_for_sub_disabled > NOTICE) + ? errhint("Consider setting %s to false.", + "retain_dead_tuples") : 0); +} + +/* * Get the list of tables which belong to specified publications on the * publisher connection. * diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c index 9f4d05ffbd4..4da46666439 100644 --- a/src/backend/libpq/auth.c +++ b/src/backend/libpq/auth.c @@ -94,8 +94,16 @@ static int auth_peer(hbaPort *port); #define PGSQL_PAM_SERVICE "postgresql" /* Service name passed to PAM */ +/* Work around original Solaris' lack of "const" in the conv_proc signature */ +#ifdef _PAM_LEGACY_NONCONST +#define PG_PAM_CONST +#else +#define PG_PAM_CONST const +#endif + static int CheckPAMAuth(Port *port, const char *user, const char *password); -static int pam_passwd_conv_proc(int num_msg, const struct pam_message **msg, +static int pam_passwd_conv_proc(int num_msg, + PG_PAM_CONST struct pam_message **msg, struct pam_response **resp, void *appdata_ptr); static struct pam_conv pam_passw_conv = { @@ -1917,7 +1925,7 @@ auth_peer(hbaPort *port) */ static int -pam_passwd_conv_proc(int num_msg, const struct pam_message **msg, +pam_passwd_conv_proc(int num_msg, PG_PAM_CONST struct pam_message **msg, struct pam_response **resp, void *appdata_ptr) { const char *passwd; diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c index 01804b085b3..3e3fec89252 100644 --- a/src/backend/optimizer/plan/initsplan.c +++ b/src/backend/optimizer/plan/initsplan.c @@ -3048,36 +3048,16 @@ add_base_clause_to_rel(PlannerInfo *root, Index relid, * expr_is_nonnullable * Check to see if the Expr cannot be NULL * - * If the Expr is a simple Var that is defined NOT NULL and meanwhile is not - * nulled by any outer joins, then we can know that it cannot be NULL. + * Currently we only support simple Vars. */ static bool expr_is_nonnullable(PlannerInfo *root, Expr *expr) { - RelOptInfo *rel; - Var *var; - /* For now only check simple Vars */ if (!IsA(expr, Var)) return false; - var = (Var *) expr; - - /* could the Var be nulled by any outer joins? */ - if (!bms_is_empty(var->varnullingrels)) - return false; - - /* system columns cannot be NULL */ - if (var->varattno < 0) - return true; - - /* is the column defined NOT NULL? */ - rel = find_base_rel(root, var->varno); - if (var->varattno > 0 && - bms_is_member(var->varattno, rel->notnullattnums)) - return true; - - return false; + return var_is_nonnullable(root, (Var *) expr, true); } /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index fc13d921d0c..c989e72cac5 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -342,6 +342,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->transientPlan = false; glob->dependsOnRole = false; glob->partition_directory = NULL; + glob->rel_notnullatts_hash = NULL; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -723,11 +724,12 @@ subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, /* * Scan the rangetable for relation RTEs and retrieve the necessary * catalog information for each relation. Using this information, clear - * the inh flag for any relation that has no children, and expand virtual - * generated columns for any relation that contains them. Note that this - * step does not descend into sublinks and subqueries; if we pull up any - * sublinks or subqueries below, their relation RTEs are processed just - * before pulling them up. + * the inh flag for any relation that has no children, collect not-null + * attribute numbers for any relation that has column not-null + * constraints, and expand virtual generated columns for any relation that + * contains them. Note that this step does not descend into sublinks and + * subqueries; if we pull up any sublinks or subqueries below, their + * relation RTEs are processed just before pulling them up. */ parse = root->parse = preprocess_relation_rtes(root); diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 4bdca59df64..d71ed958e31 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -1519,8 +1519,10 @@ convert_EXISTS_sublink_to_join(PlannerInfo *root, SubLink *sublink, /* * Scan the rangetable for relation RTEs and retrieve the necessary * catalog information for each relation. Using this information, clear - * the inh flag for any relation that has no children, and expand virtual - * generated columns for any relation that contains them. + * the inh flag for any relation that has no children, collect not-null + * attribute numbers for any relation that has column not-null + * constraints, and expand virtual generated columns for any relation that + * contains them. * * Note: we construct up an entirely dummy PlannerInfo for use here. This * is fine because only the "glob" and "parse" links will be used in this @@ -1760,6 +1762,7 @@ convert_EXISTS_to_ANY(PlannerInfo *root, Query *subselect, Node **testexpr, List **paramIds) { Node *whereClause; + PlannerInfo subroot; List *leftargs, *rightargs, *opids, @@ -1819,12 +1822,15 @@ convert_EXISTS_to_ANY(PlannerInfo *root, Query *subselect, * parent aliases were flattened already, and we're not going to pull any * child Vars (of any description) into the parent. * - * Note: passing the parent's root to eval_const_expressions is - * technically wrong, but we can get away with it since only the - * boundParams (if any) are used, and those would be the same in a - * subroot. + * Note: we construct up an entirely dummy PlannerInfo to pass to + * eval_const_expressions. This is fine because only the "glob" and + * "parse" links are used by eval_const_expressions. */ - whereClause = eval_const_expressions(root, whereClause); + MemSet(&subroot, 0, sizeof(subroot)); + subroot.type = T_PlannerInfo; + subroot.glob = root->glob; + subroot.parse = subselect; + whereClause = eval_const_expressions(&subroot, whereClause); whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false); whereClause = (Node *) make_ands_implicit((Expr *) whereClause); diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c index 4b38851bd42..35e8d3c183b 100644 --- a/src/backend/optimizer/prep/prepjointree.c +++ b/src/backend/optimizer/prep/prepjointree.c @@ -36,6 +36,7 @@ #include "optimizer/clauses.h" #include "optimizer/optimizer.h" #include "optimizer/placeholder.h" +#include "optimizer/plancat.h" #include "optimizer/prep.h" #include "optimizer/subselect.h" #include "optimizer/tlist.h" @@ -401,8 +402,9 @@ transform_MERGE_to_join(Query *parse) * * This scans the rangetable for relation RTEs and retrieves the necessary * catalog information for each relation. Using this information, it clears - * the inh flag for any relation that has no children, and expands virtual - * generated columns for any relation that contains them. + * the inh flag for any relation that has no children, collects not-null + * attribute numbers for any relation that has column not-null constraints, and + * expands virtual generated columns for any relation that contains them. * * Note that expanding virtual generated columns may cause the query tree to * have new copies of rangetable entries. Therefore, we have to use list_nth @@ -448,6 +450,13 @@ preprocess_relation_rtes(PlannerInfo *root) rte->inh = relation->rd_rel->relhassubclass; /* + * Check to see if the relation has any column not-null constraints; + * if so, retrieve the constraint information and store it in a + * relation OID based hash table. + */ + get_relation_notnullatts(root, relation); + + /* * Check to see if the relation has any virtual generated columns; if * so, replace all Var nodes in the query that reference these columns * with the generation expressions. @@ -1384,8 +1393,10 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte, /* * Scan the rangetable for relation RTEs and retrieve the necessary * catalog information for each relation. Using this information, clear - * the inh flag for any relation that has no children, and expand virtual - * generated columns for any relation that contains them. + * the inh flag for any relation that has no children, collect not-null + * attribute numbers for any relation that has column not-null + * constraints, and expand virtual generated columns for any relation that + * contains them. */ subquery = subroot->parse = preprocess_relation_rtes(subroot); diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index f45131c34c5..6f0b338d2cd 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -20,6 +20,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "catalog/pg_class.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" @@ -36,6 +37,7 @@ #include "optimizer/clauses.h" #include "optimizer/cost.h" #include "optimizer/optimizer.h" +#include "optimizer/pathnode.h" #include "optimizer/plancat.h" #include "optimizer/planmain.h" #include "parser/analyze.h" @@ -43,6 +45,7 @@ #include "parser/parse_collate.h" #include "parser/parse_func.h" #include "parser/parse_oper.h" +#include "parser/parsetree.h" #include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" @@ -2242,7 +2245,8 @@ rowtype_field_matches(Oid rowtypeid, int fieldnum, * only operators and functions that are reasonable to try to execute. * * NOTE: "root" can be passed as NULL if the caller never wants to do any - * Param substitutions nor receive info about inlined functions. + * Param substitutions nor receive info about inlined functions nor reduce + * NullTest for Vars to constant true or constant false. * * NOTE: the planner assumes that this will always flatten nested AND and * OR clauses into N-argument form. See comments in prepqual.c. @@ -3544,6 +3548,31 @@ eval_const_expressions_mutator(Node *node, return makeBoolConst(result, false); } + if (!ntest->argisrow && arg && IsA(arg, Var) && context->root) + { + Var *varg = (Var *) arg; + bool result; + + if (var_is_nonnullable(context->root, varg, false)) + { + switch (ntest->nulltesttype) + { + case IS_NULL: + result = false; + break; + case IS_NOT_NULL: + result = true; + break; + default: + elog(ERROR, "unrecognized nulltesttype: %d", + (int) ntest->nulltesttype); + result = false; /* keep compiler quiet */ + break; + } + + return makeBoolConst(result, false); + } + } newntest = makeNode(NullTest); newntest->arg = (Expr *) arg; @@ -4163,6 +4192,67 @@ simplify_function(Oid funcid, Oid result_type, int32 result_typmod, } /* + * var_is_nonnullable: check to see if the Var cannot be NULL + * + * If the Var is defined NOT NULL and meanwhile is not nulled by any outer + * joins or grouping sets, then we can know that it cannot be NULL. + * + * use_rel_info indicates whether the corresponding RelOptInfo is available for + * use. + */ +bool +var_is_nonnullable(PlannerInfo *root, Var *var, bool use_rel_info) +{ + Relids notnullattnums = NULL; + + Assert(IsA(var, Var)); + + /* skip upper-level Vars */ + if (var->varlevelsup != 0) + return false; + + /* could the Var be nulled by any outer joins or grouping sets? */ + if (!bms_is_empty(var->varnullingrels)) + return false; + + /* system columns cannot be NULL */ + if (var->varattno < 0) + return true; + + /* + * Check if the Var is defined as NOT NULL. We retrieve the column NOT + * NULL constraint information from the corresponding RelOptInfo if it is + * available; otherwise, we search the hash table for this information. + */ + if (use_rel_info) + { + RelOptInfo *rel = find_base_rel(root, var->varno); + + notnullattnums = rel->notnullattnums; + } + else + { + RangeTblEntry *rte = planner_rt_fetch(var->varno, root); + + /* + * We must skip inheritance parent tables, as some child tables may + * have a NOT NULL constraint for a column while others may not. This + * cannot happen with partitioned tables, though. + */ + if (rte->inh && rte->relkind != RELKIND_PARTITIONED_TABLE) + return false; + + notnullattnums = find_relation_notnullatts(root, rte->relid); + } + + if (var->varattno > 0 && + bms_is_member(var->varattno, notnullattnums)) + return true; + + return false; +} + +/* * expand_function_arguments: convert named-notation args to positional args * and/or insert default args, as needed * diff --git a/src/backend/optimizer/util/inherit.c b/src/backend/optimizer/util/inherit.c index 17e51cd75d7..30d158069e3 100644 --- a/src/backend/optimizer/util/inherit.c +++ b/src/backend/optimizer/util/inherit.c @@ -466,8 +466,7 @@ expand_single_inheritance_child(PlannerInfo *root, RangeTblEntry *parentrte, Index *childRTindex_p) { Query *parse = root->parse; - Oid parentOID PG_USED_FOR_ASSERTS_ONLY = - RelationGetRelid(parentrel); + Oid parentOID = RelationGetRelid(parentrel); Oid childOID = RelationGetRelid(childrel); RangeTblEntry *childrte; Index childRTindex; @@ -514,6 +513,13 @@ expand_single_inheritance_child(PlannerInfo *root, RangeTblEntry *parentrte, *childRTindex_p = childRTindex; /* + * Retrieve column not-null constraint information for the child relation + * if its relation OID is different from the parent's. + */ + if (childOID != parentOID) + get_relation_notnullatts(root, childrel); + + /* * Build an AppendRelInfo struct for each parent/child pair. */ appinfo = make_append_rel_info(parentrel, childrel, diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index 59233b64730..c6a58afc5e5 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -59,6 +59,12 @@ int constraint_exclusion = CONSTRAINT_EXCLUSION_PARTITION; /* Hook for plugins to get control in get_relation_info() */ get_relation_info_hook_type get_relation_info_hook = NULL; +typedef struct NotnullHashEntry +{ + Oid relid; /* OID of the relation */ + Relids notnullattnums; /* attnums of NOT NULL columns */ +} NotnullHashEntry; + static void get_relation_foreign_keys(PlannerInfo *root, RelOptInfo *rel, Relation relation, bool inhparent); @@ -172,27 +178,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent, * RangeTblEntry does get populated. */ if (!inhparent || relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - for (int i = 0; i < relation->rd_att->natts; i++) - { - CompactAttribute *attr = TupleDescCompactAttr(relation->rd_att, i); - - Assert(attr->attnullability != ATTNULLABLE_UNKNOWN); - - if (attr->attnullability == ATTNULLABLE_VALID) - { - rel->notnullattnums = bms_add_member(rel->notnullattnums, - i + 1); - - /* - * Per RemoveAttributeById(), dropped columns will have their - * attnotnull unset, so we needn't check for dropped columns - * in the above condition. - */ - Assert(!attr->attisdropped); - } - } - } + rel->notnullattnums = find_relation_notnullatts(root, relationObjectId); /* * Estimate relation size --- unless it's an inheritance parent, in which @@ -684,6 +670,105 @@ get_relation_foreign_keys(PlannerInfo *root, RelOptInfo *rel, } /* + * get_relation_notnullatts - + * Retrieves column not-null constraint information for a given relation. + * + * We do this while we have the relcache entry open, and store the column + * not-null constraint information in a hash table based on the relation OID. + */ +void +get_relation_notnullatts(PlannerInfo *root, Relation relation) +{ + Oid relid = RelationGetRelid(relation); + NotnullHashEntry *hentry; + bool found; + Relids notnullattnums = NULL; + + /* bail out if the relation has no not-null constraints */ + if (relation->rd_att->constr == NULL || + !relation->rd_att->constr->has_not_null) + return; + + /* create the hash table if it hasn't been created yet */ + if (root->glob->rel_notnullatts_hash == NULL) + { + HTAB *hashtab; + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(NotnullHashEntry); + hash_ctl.hcxt = CurrentMemoryContext; + + hashtab = hash_create("Relation NOT NULL attnums", + 64L, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + root->glob->rel_notnullatts_hash = hashtab; + } + + /* + * Create a hash entry for this relation OID, if we don't have one + * already. + */ + hentry = (NotnullHashEntry *) hash_search(root->glob->rel_notnullatts_hash, + &relid, + HASH_ENTER, + &found); + + /* bail out if a hash entry already exists for this relation OID */ + if (found) + return; + + /* collect the column not-null constraint information for this relation */ + for (int i = 0; i < relation->rd_att->natts; i++) + { + CompactAttribute *attr = TupleDescCompactAttr(relation->rd_att, i); + + Assert(attr->attnullability != ATTNULLABLE_UNKNOWN); + + if (attr->attnullability == ATTNULLABLE_VALID) + { + notnullattnums = bms_add_member(notnullattnums, i + 1); + + /* + * Per RemoveAttributeById(), dropped columns will have their + * attnotnull unset, so we needn't check for dropped columns in + * the above condition. + */ + Assert(!attr->attisdropped); + } + } + + /* ... and initialize the new hash entry */ + hentry->notnullattnums = notnullattnums; +} + +/* + * find_relation_notnullatts - + * Searches the hash table and returns the column not-null constraint + * information for a given relation. + */ +Relids +find_relation_notnullatts(PlannerInfo *root, Oid relid) +{ + NotnullHashEntry *hentry; + bool found; + + if (root->glob->rel_notnullatts_hash == NULL) + return NULL; + + hentry = (NotnullHashEntry *) hash_search(root->glob->rel_notnullatts_hash, + &relid, + HASH_FIND, + &found); + if (!found) + return NULL; + + return hentry->notnullattnums; +} + +/* * infer_arbiter_indexes - * Determine the unique indexes used to arbitrate speculative insertion. * diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f7b5d093681..886d99951dd 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -232,6 +232,9 @@ libpqrcv_connect(const char *conninfo, bool replication, bool logical, errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters."))); } + PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver, + "received message via replication"); + /* * Set always-secure search path for the cases where the connection is * used to run SQL queries, so malicious users can't get control. diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..1fa931a7422 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -441,7 +441,8 @@ pa_launch_parallel_worker(void) MySubscription->name, MyLogicalRepWorker->userid, InvalidOid, - dsm_segment_handle(winfo->dsm_seg)); + dsm_segment_handle(winfo->dsm_seg), + false); if (launched) { diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..742d9ba68e9 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin); +static bool acquire_conflict_slot_if_exists(void); +static void advance_conflict_slot_xmin(TransactionId new_xmin); /* @@ -148,6 +151,7 @@ get_subscription_list(void) sub->owner = subform->subowner; sub->enabled = subform->subenabled; sub->name = pstrdup(NameStr(subform->subname)); + sub->retaindeadtuples = subform->subretaindeadtuples; /* We don't fill fields we are not interested in. */ res = lappend(res, sub); @@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, - Oid relid, dsm_handle subworker_dsm) + Oid relid, dsm_handle subworker_dsm, + bool retain_dead_tuples) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, * - must be valid worker type * - tablesync workers are only ones to have relid * - parallel apply worker is the only kind of subworker + * - The replication slot used in conflict detection is created when + * retain_dead_tuples is enabled */ Assert(wtype != WORKERTYPE_UNKNOWN); Assert(is_tablesync_worker == OidIsValid(relid)); Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); + Assert(!retain_dead_tuples || MyReplicationSlot); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -454,6 +462,9 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->oldest_nonremovable_xid = retain_dead_tuples + ? MyReplicationSlot->data.xmin + : InvalidTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -1118,7 +1129,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1150,6 +1164,12 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + /* + * Acquire the conflict detection slot at startup to ensure it can be + * dropped if no longer needed after a restart. + */ + acquire_conflict_slot_if_exists(); + /* Enter main loop */ for (;;) { @@ -1159,6 +1179,9 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + bool retain_dead_tuples = false; + TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1168,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_SIZES); oldctx = MemoryContextSwitchTo(subctx); - /* Start any missing workers for enabled subscriptions. */ + /* + * Start any missing workers for enabled subscriptions. + * + * Also, during the iteration through all subscriptions, we compute + * the minimum XID required to protect deleted tuples for conflict + * detection if one of the subscription enables retain_dead_tuples + * option. + */ sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1178,6 +1208,38 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + if (sub->retaindeadtuples) + { + retain_dead_tuples = true; + + /* + * Can't advance xmin of the slot unless all the subscriptions + * with retain_dead_tuples are enabled. This is required to + * ensure that we don't advance the xmin of + * CONFLICT_DETECTION_SLOT if one of the subscriptions is not + * enabled. Otherwise, we won't be able to detect conflicts + * reliably for such a subscription even though it has set the + * retain_dead_tuples option. + */ + can_advance_xmin &= sub->enabled; + + /* + * Create a replication slot to retain information necessary + * for conflict detection such as dead tuples, commit + * timestamps, and origins. + * + * The slot is created before starting the apply worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + * + * The slot is created even for a disabled subscription to + * ensure that conflict-related information is available when + * applying remote changes that occurred before the + * subscription was enabled. + */ + CreateConflictDetectionSlot(); + } + if (!sub->enabled) continue; @@ -1186,7 +1248,27 @@ ApplyLauncherMain(Datum main_arg) LWLockRelease(LogicalRepWorkerLock); if (w != NULL) - continue; /* worker is running already */ + { + /* + * Compute the minimum xmin required to protect dead tuples + * required for conflict detection among all running apply + * workers that enables retain_dead_tuples. + */ + if (sub->retaindeadtuples && can_advance_xmin) + compute_min_nonremovable_xid(w, &xmin); + + /* worker is running already */ + continue; + } + + /* + * Can't advance xmin of the slot unless all the workers + * corresponding to subscriptions with retain_dead_tuples are + * running, disabling the further computation of the minimum + * nonremovable xid. + */ + if (sub->retaindeadtuples) + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1210,7 +1292,8 @@ ApplyLauncherMain(Datum main_arg) if (!logicalrep_worker_launch(WORKERTYPE_APPLY, sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, - DSM_HANDLE_INVALID)) + DSM_HANDLE_INVALID, + sub->retaindeadtuples)) { /* * We get here either if we failed to launch a worker @@ -1230,6 +1313,20 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription + * that requires us to retain dead tuples. Otherwise, if required, + * advance the slot's xmin to protect dead tuples required for the + * conflict detection. + */ + if (MyReplicationSlot) + { + if (!retain_dead_tuples) + ReplicationSlotDropAcquired(); + else if (can_advance_xmin) + advance_conflict_slot_xmin(xmin); + } + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1258,6 +1355,125 @@ ApplyLauncherMain(Datum main_arg) } /* + * Determine the minimum non-removable transaction ID across all apply workers + * for subscriptions that have retain_dead_tuples enabled. Store the result + * in *xmin. + */ +static void +compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin) +{ + TransactionId nonremovable_xid; + + Assert(worker != NULL); + + /* + * The replication slot for conflict detection must be created before the + * worker starts. + */ + Assert(MyReplicationSlot); + + SpinLockAcquire(&worker->relmutex); + nonremovable_xid = worker->oldest_nonremovable_xid; + SpinLockRelease(&worker->relmutex); + + Assert(TransactionIdIsValid(nonremovable_xid)); + + if (!TransactionIdIsValid(*xmin) || + TransactionIdPrecedes(nonremovable_xid, *xmin)) + *xmin = nonremovable_xid; +} + +/* + * Acquire the replication slot used to retain information for conflict + * detection, if it exists. + * + * Return true if successfully acquired, otherwise return false. + */ +static bool +acquire_conflict_slot_if_exists(void) +{ + if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + return false; + + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return true; +} + +/* + * Advance the xmin the replication slot used to retain information required + * for conflict detection. + */ +static void +advance_conflict_slot_xmin(TransactionId new_xmin) +{ + Assert(MyReplicationSlot); + Assert(TransactionIdIsValid(new_xmin)); + Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin)); + + /* Return if the xmin value of the slot cannot be advanced */ + if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin)) + return; + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = new_xmin; + MyReplicationSlot->data.xmin = new_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + ReplicationSlotMarkDirty(); + ReplicationSlotsComputeRequiredXmin(false); + + /* + * Like PhysicalConfirmReceivedLocation(), do not save slot information + * each time. This is acceptable because all concurrent transactions on + * the publisher that require the data preceding the slot's xmin should + * have already been applied and flushed on the subscriber before the xmin + * is advanced. So, even if the slot's xmin regresses after a restart, it + * will be advanced again in the next cycle. Therefore, no data required + * for conflict detection will be prematurely removed. + */ + return; +} + +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + TransactionId xmin_horizon; + + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); +} + +/* * Is current process the logical replication launcher? */ bool diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7b4e8629553..5febd154b6b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4917,7 +4917,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e4fd6347fd1..3fea0a0206e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -615,7 +615,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) MySubscription->name, MyLogicalRepWorker->userid, rstate->relid, - DSM_HANDLE_INVALID); + DSM_HANDLE_INVALID, + false); } } } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c5fb627aa56..b59221c4d06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,96 @@ * failover = true when creating the subscription. Enabling failover allows us * to smoothly transition to the promoted standby, ensuring that we can * subscribe to the new primary without losing any data. + * + * RETAIN DEAD TUPLES + * ---------------------- + * Each apply worker that enabled retain_dead_tuples option maintains a + * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to + * prevent dead rows from being removed prematurely when the apply worker still + * needs them to detect conflicts reliably. This helps to retain the required + * commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. + * + * The logical replication launcher manages an internal replication slot named + * "pg_conflict_detection". It asynchronously aggregates the non-removable + * transaction ID from all apply workers to determine the appropriate xmin for + * the slot, thereby retaining necessary tuples. + * + * The non-removable transaction ID in the apply worker is advanced to the + * oldest running transaction ID once all concurrent transactions on the + * publisher have been applied and flushed locally. The process involves: + * + * - RDT_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RDT_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RDT_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RDT_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, detecting concurrent remote + * transactions with earlier timestamps than the DELETE is necessary, as the + * UPDATEs in remote transactions should be ignored if their timestamp is + * earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * Similarly, when the publisher has subscribed to another publisher, + * information necessary for conflict detection cannot be retained for + * changes from origins other than the publisher. This is because publisher + * lacks the information on concurrent transactions of other publishers to + * which it subscribes. As the information on concurrent transactions is + * unavailable beyond subscriber's immediate publishers, the non-removable + * transaction ID might be advanced prematurely before changes from other + * origins have been fully applied. + * + * XXX Retaining information for changes from other origins might be possible + * by requesting the subscription on that origin to enable retain_dead_tuples + * and fetching the conflict detection slot.xmin along with the publisher's + * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could + * wait for the remote slot's xmin to reach the oldest active transaction ID, + * ensuring that all transactions from other origins have been applied on the + * publisher, thereby getting the latest WAL position that includes all + * concurrent changes. However, this approach may impact performance, so it + * might not worth the effort. + * + * XXX It seems feasible to get the latest commit's WAL location from the + * publisher and wait till that is applied. However, we can't do that + * because commit timestamps can regress as a commit with a later LSN is not + * guaranteed to have a later timestamp than those with earlier LSNs. Having + * said that, even if that is possible, it won't improve performance much as + * the apply always lag and moves slowly as compared with the transactions + * on the publisher. *------------------------------------------------------------------------- */ @@ -140,6 +230,7 @@ #include <sys/stat.h> #include <unistd.h> +#include "access/commit_ts.h" #include "access/table.h" #include "access/tableam.h" #include "access/twophase.h" @@ -148,6 +239,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/subscriptioncmds.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -166,12 +258,14 @@ #include "replication/logicalrelation.h" #include "replication/logicalworker.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -268,6 +362,78 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See comments atop worker.c for details of the transition between these + * phases. + */ +typedef enum +{ + RDT_GET_CANDIDATE_XID, + RDT_REQUEST_PUBLISHER_STATUS, + RDT_WAIT_FOR_PUBLISHER_STATUS, + RDT_WAIT_FOR_LOCAL_FLUSH +} RetainDeadTuplesPhase; + +/* + * Critical information for managing phase transitions within the + * RetainDeadTuplesPhase. + */ +typedef struct RetainDeadTuplesData +{ + RetainDeadTuplesPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + + /* + * Oldest transaction ID that was in the commit phase on the publisher. + * Use FullTransactionId to prevent issues with transaction ID wraparound, + * where a new remote_oldestxid could falsely appear to originate from the + * past and block advancement. + */ + FullTransactionId remote_oldestxid; + + /* + * Next transaction ID to be assigned on the publisher. Use + * FullTransactionId for consistency and to allow straightforward + * comparisons with remote_oldestxid. + */ + FullTransactionId remote_nextxid; + + TimestampTz reply_time; /* when the publisher responds with status */ + + /* + * Publisher transaction ID that must be awaited to complete before + * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use + * FullTransactionId for the same reason as remote_nextxid. + */ + FullTransactionId remote_wait_for; + + TransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RDT_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainDeadTuplesData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We record + * and use this information both while sending feedback to the server and + * advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -372,6 +545,19 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received); +static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data); +static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received); +static void get_candidate_xid(RetainDeadTuplesData *rdt_data); +static void request_publisher_status(RetainDeadTuplesData *rdt_data); +static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received); +static void wait_for_local_flush(RetainDeadTuplesData *rdt_data); +static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainDeadTuplesData rdt_data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + rdt_data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&rdt_data, false); } else if (c == 'k') { @@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&rdt_data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + rdt_data.remote_lsn = pq_getmsgint64(&s); + rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rdt_data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(rdt_data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&rdt_data, true); + + UpdateWorkerStats(last_received, rdt_data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + rdt_data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&rdt_data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to advance the non-removable + * transaction ID. + */ + if (rdt_data.phase == RDT_GET_CANDIDATE_XID && + rdt_data.xid_advance_interval) + wait_time = Min(wait_time, rdt_data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&rdt_data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) } /* + * Attempt to advance the non-removable transaction ID. + * + * See comments atop worker.c for details. + */ +static void +maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + if (!can_advance_nonremovable_xid(rdt_data)) + return; + + process_rdt_phase_transition(rdt_data, status_received); +} + +/* + * Preliminary check to determine if advancing the non-removable transaction ID + * is allowed. + */ +static bool +can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect conflicts reliably even + * for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return false; + + /* No need to advance if retaining dead tuples is not required */ + if (!MySubscription->retaindeadtuples) + return false; + + return true; +} + +/* + * Process phase transitions during the non-removable transaction ID + * advancement. See comments atop worker.c for details of the transition. + */ +static void +process_rdt_phase_transition(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + switch (rdt_data->phase) + { + case RDT_GET_CANDIDATE_XID: + get_candidate_xid(rdt_data); + break; + case RDT_REQUEST_PUBLISHER_STATUS: + request_publisher_status(rdt_data); + break; + case RDT_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(rdt_data, status_received); + break; + case RDT_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(rdt_data); + break; + } +} + +/* + * Workhorse for the RDT_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainDeadTuplesData *rdt_data) +{ + TransactionId oldest_running_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now, + rdt_data->xid_advance_interval)) + return; + + /* + * Immediately update the timer, even if the function returns later + * without setting candidate_xid due to inactivity on the subscriber. This + * avoids frequent calls to GetOldestActiveTransactionId. + */ + rdt_data->candidate_xid_time = now; + + /* + * Consider transactions in the current database, as only dead tuples from + * this database are required for conflict detection. + */ + oldest_running_xid = GetOldestActiveTransactionId(false, false); + + /* + * Oldest active transaction ID (oldest_running_xid) can't be behind any + * of its previously computed value. + */ + Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)) + { + adjust_xid_advance_interval(rdt_data, false); + return; + } + + adjust_xid_advance_interval(rdt_data, true); + + rdt_data->candidate_xid = oldest_running_xid; + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainDeadTuplesData *rdt_data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainDeadTuplesData *rdt_data, + bool status_received) +{ + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + if (!FullTransactionIdIsValid(rdt_data->remote_wait_for)) + rdt_data->remote_wait_for = rdt_data->remote_nextxid; + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + * + * It's possible that transactions in the commit phase during the last + * cycle have now finished committing, but remote_oldestxid remains older + * than remote_wait_for. This can happen if some old transaction came in + * the commit phase when we requested status in this cycle. We do not + * handle this case explicitly as it's rare and the benefit doesn't + * justify the required complexity. Tracking would require either caching + * all xids at the publisher or sending them to subscribers. The condition + * will resolve naturally once the remaining transactions are finished. + * + * Directly advancing the non-removable transaction ID is possible if + * there are no activities on the publisher since the last advancement + * cycle. However, it requires maintaining two fields, last_remote_nextxid + * and last_remote_lsn, within the structure for comparison with the + * current cycle's values. Considering the minimal cost of continuing in + * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to + * advance the transaction ID here. + */ + if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for, + rdt_data->remote_oldestxid)) + rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH; + else + rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainDeadTuplesData *rdt_data) +{ + Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) && + TransactionIdIsValid(rdt_data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect conflicts reliably. This check primarily addresses + * scenarios where the publisher's clock falls behind; if the publisher's + * clock is ahead, subsequent transactions will naturally bear later + * commit timestamps, conforming to the design outlined atop worker.c. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(rdt_data->reply_time, + rdt_data->candidate_xid_time, 0)) + ereport(ERROR, + errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"), + errdetail_internal("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the rdt_data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costly operations in get_flush_position() too frequently + * during change application. + */ + if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time && + TimestampDifferenceExceeds(rdt_data->flushpos_update_time, + rdt_data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + rdt_data->flushpos_update_time = rdt_data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < rdt_data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + rdt_data->candidate_xid); + + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. We can even use + * flushpos_update_time in the next round to decide whether to get the + * latest flush position. + */ + rdt_data->phase = RDT_GET_CANDIDATE_XID; + rdt_data->remote_lsn = InvalidXLogRecPtr; + rdt_data->remote_oldestxid = InvalidFullTransactionId; + rdt_data->remote_nextxid = InvalidFullTransactionId; + rdt_data->reply_time = 0; + rdt_data->remote_wait_for = InvalidFullTransactionId; + rdt_data->candidate_xid = InvalidTransactionId; + + /* process the next phase */ + process_rdt_phase_transition(rdt_data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found) +{ + if (!new_xid_found && rdt_data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + +/* * Exit routine for apply workers due to subscription parameter changes. */ static void @@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void) apply_worker_exit(); } + /* + * Restart the worker if retain_dead_tuples was enabled during startup. + * + * At this point, the replication slot used for conflict detection might + * not exist yet, or could be dropped soon if the launcher perceives + * retain_dead_tuples as disabled. To avoid unnecessary tracking of + * oldest_nonremovable_xid when the slot is absent or at risk of being + * dropped, a restart is initiated. + * + * The oldest_nonremovable_xid should be initialized only when the + * retain_dead_tuples is enabled before launching the worker. See + * logicalrep_worker_launch. + */ + if (am_leader_apply_worker() && + MySubscription->retaindeadtuples && + !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup", + MySubscription->name, "retain_dead_tuples")); + + apply_worker_exit(); + } + /* Setup synchronous commit according to the user's wishes */ SetConfigOption("synchronous_commit", MySubscription->synccommit, PGC_BACKEND, PGC_S_OVERRIDE); @@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Skip the track_commit_timestamp check when disabling the worker due to + * an error, as verifying commit timestamps is unnecessary in this + * context. + */ + if (MySubscription->retaindeadtuples) + CheckSubDeadTupleRetention(false, true, WARNING); + proc_exit(0); } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e44ad576bc7..8605776ad86 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsSlotForConflictCheck(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsSlotForConflictCheck(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name), + errdetail("The name \"%s\" is reserved for the conflict detection slot.", + CONFLICT_DETECTION_SLOT)); + + return false; + } + return true; } /* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsSlotForConflictCheck(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + +/* * Create a new replication slot and mark it as used by this backend. * * name: Name of the slot @@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher or pg_upgrade may create or migrate an internal + * slot, so using a reserved name is allowed in these cases. + */ + ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(), + ERROR); if (failover) { @@ -582,6 +615,17 @@ retry: } /* + * Do not allow users to acquire the reserved slot. This scenario may + * occur if the launcher that owns the slot has terminated unexpectedly + * due to an error, and a backend process attempts to reuse the slot. + */ + if (!IsLogicalLauncher() && IsSlotForConflictCheck(name)) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("cannot acquire replication slot \"%s\"", name), + errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher.")); + + /* * This is the slot we want; check if it's active under some other * process. In single user mode, we don't need this check. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 28b8591efa5..ee911394a23 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -65,6 +65,7 @@ #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" +#include "libpq/protocol.h" #include "miscadmin.h" #include "nodes/replnodes.h" #include "pgstat.h" @@ -84,6 +85,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -258,6 +260,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -733,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: maxmsglen = PQ_LARGE_MESSAGE_LIMIT; break; - case 'c': /* CopyDone */ - case 'f': /* CopyFail */ - case 'H': /* Flush */ - case 'S': /* Sync */ + case PqMsg_CopyDone: + case PqMsg_CopyFail: + case PqMsg_Flush: + case PqMsg_Sync: maxmsglen = PQ_SMALL_MESSAGE_LIMIT; break; default: @@ -761,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, /* Process the message */ switch (mtype) { - case 'd': /* CopyData */ + case PqMsg_CopyData: AppendIncrementalManifestData(ib, buf->data, buf->len); return true; - case 'c': /* CopyDone */ + case PqMsg_CopyDone: return false; - case 'H': /* Sync */ - case 'S': /* Flush */ + case PqMsg_Sync: + case PqMsg_Flush: /* Ignore these while in CopyOut mode as we do elsewhere. */ return true; - case 'f': + case PqMsg_CopyFail: ereport(ERROR, (errcode(ERRCODE_QUERY_CANCELED), errmsg("COPY from stdin failed: %s", @@ -1567,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, tmpbuf.data, sizeof(int64)); /* output previously gathered data in a CopyData packet */ - pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); + pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); CHECK_FOR_INTERRUPTS(); @@ -2303,7 +2306,7 @@ ProcessRepliesIfAny(void) case PqMsg_CopyDone: if (!streamingDoneSending) { - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; } @@ -2355,6 +2358,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'p': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2702,6 +2709,60 @@ ProcessStandbyHSFeedbackMessage(void) } /* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestXidInCommit; + FullTransactionId nextFullXid; + FullTransactionId fullOldestXidInCommit; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + /* + * This shouldn't happen because we don't support getting primary status + * message from standby. + */ + if (RecoveryInProgress()) + elog(ERROR, "the primary status is unavailable during recovery"); + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + /* + * Consider transactions in the current database, as only these are the + * ones replicated. + */ + oldestXidInCommit = GetOldestActiveTransactionId(true, false); + nextFullXid = ReadNextFullTransactionId(); + fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid, + oldestXidInCommit); + lsn = GetXLogWriteRecPtr(); + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit)); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); +} + +/* * Compute how long send/receive loops should sleep. * * If wal_sender_timeout is enabled we want to wake up in time to send @@ -3246,7 +3307,7 @@ XLogSendPhysical(void) wal_segment_close(xlogreader); /* Send CopyDone */ - pq_putmessage_noblock('c', NULL, 0); + pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); streamingDoneSending = true; WalSndCaughtUp = true; @@ -3374,7 +3435,7 @@ retry: memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); sentPtr = endptr; @@ -4080,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) pq_sendbyte(&output_message, requestReply ? 1 : 0); /* ... and send it wrapped in CopyData */ - pq_putmessage_noblock('d', output_message.data, output_message.len); + pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); /* Set local flag */ if (requestReply) diff --git a/src/backend/storage/aio/README.md b/src/backend/storage/aio/README.md index f10b5c7e31e..72ae3b3737d 100644 --- a/src/backend/storage/aio/README.md +++ b/src/backend/storage/aio/README.md @@ -94,7 +94,7 @@ pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV, 0); * * In this example we're reading only a single buffer, hence the 1. */ -pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); +pgaio_io_set_handle_data_32(ioh, (uint32 *) &buffer, 1); /* * Pass the AIO handle to lower-level function. When operating on the level of @@ -119,8 +119,9 @@ pgaio_io_set_handle_data_32(ioh, (uint32 *) buffer, 1); * e.g. due to reaching a limit on the number of unsubmitted IOs, and even * complete before smgrstartreadv() returns. */ +void *page = BufferGetBlock(buffer); smgrstartreadv(ioh, operation->smgr, forknum, blkno, - BufferGetBlock(buffer), 1); + &page, 1); /* * To benefit from AIO, it is beneficial to perform other work, including diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 2418967def6..bf987aed8d3 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2814,8 +2814,10 @@ GetRunningTransactionData(void) * * Similar to GetSnapshotData but returns just oldestActiveXid. We include * all PGPROCs with an assigned TransactionId, even VACUUM processes. - * We look at all databases, though there is no need to include WALSender - * since this has no effect on hot standby conflicts. + * + * If allDbs is true, we look at all databases, though there is no need to + * include WALSender since this has no effect on hot standby conflicts. If + * allDbs is false, skip processes attached to other databases. * * This is never executed during recovery so there is no need to look at * KnownAssignedXids. @@ -2823,9 +2825,12 @@ GetRunningTransactionData(void) * We don't worry about updating other counters, we want to keep this as * simple as possible and leave GetSnapshotData() as the primary code for * that bookkeeping. + * + * inCommitOnly indicates getting the oldestActiveXid among the transactions + * in the commit critical section. */ TransactionId -GetOldestActiveTransactionId(void) +GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs) { ProcArrayStruct *arrayP = procArray; TransactionId *other_xids = ProcGlobal->xids; @@ -2852,6 +2857,8 @@ GetOldestActiveTransactionId(void) for (index = 0; index < arrayP->numProcs; index++) { TransactionId xid; + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; /* Fetch xid just once - see GetNewTransactionId */ xid = UINT32_ACCESS_ONCE(other_xids[index]); @@ -2859,6 +2866,13 @@ GetOldestActiveTransactionId(void) if (!TransactionIdIsNormal(xid)) continue; + if (inCommitOnly && + (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0) + continue; + + if (!allDbs && proc->databaseId != MyDatabaseId) + continue; + if (TransactionIdPrecedes(xid, oldestRunningXid)) oldestRunningXid = xid; diff --git a/src/backend/storage/lmgr/generate-lwlocknames.pl b/src/backend/storage/lmgr/generate-lwlocknames.pl index c7a6720440d..cd3e43c448a 100644 --- a/src/backend/storage/lmgr/generate-lwlocknames.pl +++ b/src/backend/storage/lmgr/generate-lwlocknames.pl @@ -27,18 +27,24 @@ print $h "/* there is deliberately not an #ifndef LWLOCKNAMES_H here */\n\n"; # -# First, record the predefined LWLocks listed in wait_event_names.txt. We'll -# cross-check those with the ones in lwlocklist.h. +# First, record the predefined LWLocks and built-in tranches listed in +# wait_event_names.txt. We'll cross-check those with the ones in lwlocklist.h. # +my @wait_event_tranches; my @wait_event_lwlocks; my $record_lwlocks = 0; +my $in_tranches = 0; while (<$wait_event_names>) { chomp; # Check for end marker. - last if /^# END OF PREDEFINED LWLOCKS/; + if (/^# END OF PREDEFINED LWLOCKS/) + { + $in_tranches = 1; + next; + } # Skip comments and empty lines. next if /^#/; @@ -54,13 +60,29 @@ while (<$wait_event_names>) # Go to the next line if we are not yet recording LWLocks. next if not $record_lwlocks; + # Stop recording if we reach another section. + last if /^Section:/; + # Record the LWLock. (my $waiteventname, my $waitevendocsentence) = split(/\t/, $_); - push(@wait_event_lwlocks, $waiteventname); + + if ($in_tranches) + { + push(@wait_event_tranches, $waiteventname); + } + else + { + push(@wait_event_lwlocks, $waiteventname); + } } +# +# While gathering the list of predefined LWLocks, cross-check the lists in +# lwlocklist.h with the wait events we just recorded. +# my $in_comment = 0; -my $i = 0; +my $lwlock_count = 0; +my $tranche_count = 0; while (<$lwlocklist>) { chomp; @@ -81,38 +103,72 @@ while (<$lwlocklist>) next; } - die "unable to parse lwlocklist.h line \"$_\"" - unless /^PG_LWLOCK\((\d+),\s+(\w+)\)$/; + # + # Gather list of predefined LWLocks and cross-check with the wait events. + # + if (/^PG_LWLOCK\((\d+),\s+(\w+)\)$/) + { + my ($lockidx, $lockname) = ($1, $2); - (my $lockidx, my $lockname) = ($1, $2); + die "lwlocklist.h not in order" if $lockidx < $lastlockidx; + die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; - die "lwlocklist.h not in order" if $lockidx < $lastlockidx; - die "lwlocklist.h has duplicates" if $lockidx == $lastlockidx; + die "$lockname defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $lwlock_count >= scalar @wait_event_lwlocks; + die "lists of predefined LWLocks do not match (first mismatch at " + . "$wait_event_lwlocks[$lwlock_count] in wait_event_names.txt and " + . "$lockname in lwlocklist.h)" + if $wait_event_lwlocks[$lwlock_count] ne $lockname; - die "$lockname defined in lwlocklist.h but missing from " - . "wait_event_names.txt" - if $i >= scalar @wait_event_lwlocks; - die "lists of predefined LWLocks do not match (first mismatch at " - . "$wait_event_lwlocks[$i] in wait_event_names.txt and $lockname in " - . "lwlocklist.h)" - if $wait_event_lwlocks[$i] ne $lockname; - $i++; + $lwlock_count++; - while ($lastlockidx < $lockidx - 1) + while ($lastlockidx < $lockidx - 1) + { + ++$lastlockidx; + } + $lastlockidx = $lockidx; + + # Add a "Lock" suffix to each lock name, as the C code depends on that. + printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", + $lockname . "Lock"; + + next; + } + + # + # Cross-check the built-in LWLock tranches with the wait events. + # + if (/^PG_LWLOCKTRANCHE\((\w+),\s+(\w+)\)$/) { - ++$lastlockidx; + my ($tranche_id, $tranche_name) = ($1, $2); + + die "$tranche_name defined in lwlocklist.h but missing from " + . "wait_event_names.txt" + if $tranche_count >= scalar @wait_event_tranches; + die + "lists of built-in LWLock tranches do not match (first mismatch at " + . "$wait_event_tranches[$tranche_count] in wait_event_names.txt and " + . "$tranche_name in lwlocklist.h)" + if $wait_event_tranches[$tranche_count] ne $tranche_name; + + $tranche_count++; + + next; } - $lastlockidx = $lockidx; - # Add a "Lock" suffix to each lock name, as the C code depends on that - printf $h "#define %-32s (&MainLWLockArray[$lockidx].lock)\n", - $lockname . "Lock"; + die "unable to parse lwlocklist.h line \"$_\""; } die - "$wait_event_lwlocks[$i] defined in wait_event_names.txt but missing from " - . "lwlocklist.h" - if $i < scalar @wait_event_lwlocks; + "$wait_event_lwlocks[$lwlock_count] defined in wait_event_names.txt but " + . " missing from lwlocklist.h" + if $lwlock_count < scalar @wait_event_lwlocks; + +die + "$wait_event_tranches[$tranche_count] defined in wait_event_names.txt but " + . "missing from lwlocklist.h" + if $tranche_count < scalar @wait_event_tranches; print $h "\n"; printf $h "#define NUM_INDIVIDUAL_LWLOCKS %s\n", $lastlockidx + 1; diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 2d43bf2cc13..ec9c345ffdf 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -122,9 +122,8 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, * own tranche. We absorb the names of these tranches from there into * BuiltinTrancheNames here. * - * 2. There are some predefined tranches for built-in groups of locks. - * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names - * appear in BuiltinTrancheNames[] below. + * 2. There are some predefined tranches for built-in groups of locks defined + * in lwlocklist.h. We absorb the names of these tranches, too. * * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche * or LWLockRegisterTranche. The names of these that are known in the current @@ -135,49 +134,10 @@ StaticAssertDecl((LW_VAL_EXCLUSIVE & LW_FLAG_MASK) == 0, */ static const char *const BuiltinTrancheNames[] = { #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname), +#define PG_LWLOCKTRANCHE(id, lockname) [LWTRANCHE_##id] = CppAsString(lockname), #include "storage/lwlocklist.h" #undef PG_LWLOCK - [LWTRANCHE_XACT_BUFFER] = "XactBuffer", - [LWTRANCHE_COMMITTS_BUFFER] = "CommitTsBuffer", - [LWTRANCHE_SUBTRANS_BUFFER] = "SubtransBuffer", - [LWTRANCHE_MULTIXACTOFFSET_BUFFER] = "MultiXactOffsetBuffer", - [LWTRANCHE_MULTIXACTMEMBER_BUFFER] = "MultiXactMemberBuffer", - [LWTRANCHE_NOTIFY_BUFFER] = "NotifyBuffer", - [LWTRANCHE_SERIAL_BUFFER] = "SerialBuffer", - [LWTRANCHE_WAL_INSERT] = "WALInsert", - [LWTRANCHE_BUFFER_CONTENT] = "BufferContent", - [LWTRANCHE_REPLICATION_ORIGIN_STATE] = "ReplicationOriginState", - [LWTRANCHE_REPLICATION_SLOT_IO] = "ReplicationSlotIO", - [LWTRANCHE_LOCK_FASTPATH] = "LockFastPath", - [LWTRANCHE_BUFFER_MAPPING] = "BufferMapping", - [LWTRANCHE_LOCK_MANAGER] = "LockManager", - [LWTRANCHE_PREDICATE_LOCK_MANAGER] = "PredicateLockManager", - [LWTRANCHE_PARALLEL_HASH_JOIN] = "ParallelHashJoin", - [LWTRANCHE_PARALLEL_BTREE_SCAN] = "ParallelBtreeScan", - [LWTRANCHE_PARALLEL_QUERY_DSA] = "ParallelQueryDSA", - [LWTRANCHE_PER_SESSION_DSA] = "PerSessionDSA", - [LWTRANCHE_PER_SESSION_RECORD_TYPE] = "PerSessionRecordType", - [LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod", - [LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore", - [LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap", - [LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend", - [LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList", - [LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA", - [LWTRANCHE_PGSTATS_HASH] = "PgStatsHash", - [LWTRANCHE_PGSTATS_DATA] = "PgStatsData", - [LWTRANCHE_LAUNCHER_DSA] = "LogicalRepLauncherDSA", - [LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash", - [LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA", - [LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash", - [LWTRANCHE_COMMITTS_SLRU] = "CommitTsSLRU", - [LWTRANCHE_MULTIXACTOFFSET_SLRU] = "MultiXactOffsetSLRU", - [LWTRANCHE_MULTIXACTMEMBER_SLRU] = "MultiXactMemberSLRU", - [LWTRANCHE_NOTIFY_SLRU] = "NotifySLRU", - [LWTRANCHE_SERIAL_SLRU] = "SerialSLRU", - [LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU", - [LWTRANCHE_XACT_SLRU] = "XactSLRU", - [LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA", - [LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion", +#undef PG_LWLOCKTRANCHE }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4da68312b5f..0be307d2ca0 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -356,9 +356,13 @@ AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) # -# Predefined LWLocks (i.e., those declared in lwlocknames.h) must be listed -# in the section above and must be listed in the same order as in -# lwlocknames.h. Other LWLocks must be listed in the section below. +# Predefined LWLocks (i.e., those declared at the top of lwlocknames.h) must be +# listed in the section above and must be listed in the same order as in +# lwlocknames.h. +# +# Likewise, the built-in LWLock tranches (i.e., those declared at the bottom of +# lwlocknames.h) must be listed in the section below and must be listed in the +# same order as in lwlocknames.h. # XactBuffer "Waiting for I/O on a transaction status SLRU buffer." diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index d44f8c262ba..a4f8b4faa90 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -21,6 +21,7 @@ #include "commands/extension.h" #include "miscadmin.h" #include "replication/logical.h" +#include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" @@ -410,3 +411,21 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +/* + * binary_upgrade_create_conflict_detection_slot + * + * Create a replication slot to retain information necessary for conflict + * detection such as dead tuples, commit timestamps, and origins. + */ +Datum +binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS) +{ + CHECK_IS_BINARY_UPGRADE; + + CreateConflictDetectionSlot(); + + ReplicationSlotRelease(); + + PG_RETURN_VOID(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index ede10e5291e..6298edb26b5 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5028,6 +5028,7 @@ getSubscriptions(Archive *fout) int i_suboriginremotelsn; int i_subenabled; int i_subfailover; + int i_subretaindeadtuples; int i, ntups; @@ -5100,10 +5101,17 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 170000) appendPQExpBufferStr(query, - " s.subfailover\n"); + " s.subfailover,\n"); else appendPQExpBufferStr(query, - " false AS subfailover\n"); + " false AS subfailover,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + " s.subretaindeadtuples\n"); + else + appendPQExpBufferStr(query, + " false AS subretaindeadtuples\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); @@ -5137,6 +5145,7 @@ getSubscriptions(Archive *fout) i_subpasswordrequired = PQfnumber(res, "subpasswordrequired"); i_subrunasowner = PQfnumber(res, "subrunasowner"); i_subfailover = PQfnumber(res, "subfailover"); + i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5170,6 +5179,8 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0); subinfo[i].subfailover = (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0); + subinfo[i].subretaindeadtuples = + (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) @@ -5428,6 +5439,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (subinfo->subfailover) appendPQExpBufferStr(query, ", failover = true"); + if (subinfo->subretaindeadtuples) + appendPQExpBufferStr(query, ", retain_dead_tuples = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 2370c98d192..93a4475d51b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -711,6 +711,7 @@ typedef struct _SubscriptionInfo bool subpasswordrequired; bool subrunasowner; bool subfailover; + bool subretaindeadtuples; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 30579ef2051..5e6403f0773 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -28,7 +28,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster); static void check_for_new_tablespace_dir(void); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_for_unicode_update(ClusterInfo *cluster); -static void check_new_cluster_logical_replication_slots(void); +static void check_new_cluster_replication_slots(void); static void check_new_cluster_subscription_configuration(void); static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); @@ -631,7 +631,7 @@ check_and_dump_old_cluster(void) * Before that the logical slots are not upgraded, so we will not be * able to upgrade the logical replication clusters completely. */ - get_subscription_count(&old_cluster); + get_subscription_info(&old_cluster); check_old_cluster_subscription_state(); } @@ -764,7 +764,7 @@ check_new_cluster(void) check_for_new_tablespace_dir(); - check_new_cluster_logical_replication_slots(); + check_new_cluster_replication_slots(); check_new_cluster_subscription_configuration(); } @@ -2040,48 +2040,80 @@ check_for_unicode_update(ClusterInfo *cluster) } /* - * check_new_cluster_logical_replication_slots() + * check_new_cluster_replication_slots() * - * Verify that there are no logical replication slots on the new cluster and - * that the parameter settings necessary for creating slots are sufficient. + * Validate the new cluster's readiness for migrating replication slots: + * - Ensures no existing logical replication slots on the new cluster when + * migrating logical slots. + * - Ensure conflict detection slot does not exist on the new cluster when + * migrating subscriptions with retain_dead_tuples enabled. + * - Ensure that the parameter settings on the new cluster necessary for + * creating slots are sufficient. */ static void -check_new_cluster_logical_replication_slots(void) +check_new_cluster_replication_slots(void) { PGresult *res; PGconn *conn; int nslots_on_old; int nslots_on_new; + int rdt_slot_on_new; int max_replication_slots; char *wal_level; + int i_nslots_on_new; + int i_rdt_slot_on_new; - /* Logical slots can be migrated since PG17. */ + /* + * Logical slots can be migrated since PG17 and a physical slot + * CONFLICT_DETECTION_SLOT can be migrated since PG19. + */ if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) return; nslots_on_old = count_old_cluster_logical_slots(); - /* Quick return if there are no logical slots to be migrated. */ - if (nslots_on_old == 0) + /* + * Quick return if there are no slots to be migrated and no subscriptions + * have the retain_dead_tuples option enabled. + */ + if (nslots_on_old == 0 && !old_cluster.sub_retain_dead_tuples) return; conn = connectToServer(&new_cluster, "template1"); - prep_status("Checking for new cluster logical replication slots"); + prep_status("Checking for new cluster replication slots"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "temporary IS FALSE;"); + res = executeQueryOrDie(conn, "SELECT %s AS nslots_on_new, %s AS rdt_slot_on_new " + "FROM pg_catalog.pg_replication_slots", + nslots_on_old > 0 + ? "COUNT(*) FILTER (WHERE slot_type = 'logical' AND temporary IS FALSE)" + : "0", + old_cluster.sub_retain_dead_tuples + ? "COUNT(*) FILTER (WHERE slot_name = 'pg_conflict_detection')" + : "0"); if (PQntuples(res) != 1) - pg_fatal("could not count the number of logical replication slots"); + pg_fatal("could not count the number of replication slots"); - nslots_on_new = atoi(PQgetvalue(res, 0, 0)); + i_nslots_on_new = PQfnumber(res, "nslots_on_new"); + i_rdt_slot_on_new = PQfnumber(res, "rdt_slot_on_new"); + + nslots_on_new = atoi(PQgetvalue(res, 0, i_nslots_on_new)); if (nslots_on_new) + { + Assert(nslots_on_old); pg_fatal("expected 0 logical replication slots but found %d", nslots_on_new); + } + + rdt_slot_on_new = atoi(PQgetvalue(res, 0, i_rdt_slot_on_new)); + + if (rdt_slot_on_new) + { + Assert(old_cluster.sub_retain_dead_tuples); + pg_fatal("The replication slot \"pg_conflict_detection\" already exists on the new cluster"); + } PQclear(res); @@ -2094,12 +2126,24 @@ check_new_cluster_logical_replication_slots(void) wal_level = PQgetvalue(res, 0, 0); - if (strcmp(wal_level, "logical") != 0) + if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0) pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"", wal_level); + if (old_cluster.sub_retain_dead_tuples && + strcmp(wal_level, "minimal") == 0) + pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"", + wal_level); + max_replication_slots = atoi(PQgetvalue(res, 1, 0)); + if (old_cluster.sub_retain_dead_tuples && + nslots_on_old + 1 > max_replication_slots) + pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " + "logical replication slots on the old cluster plus one additional slot required " + "for retaining conflict detection information (%d)", + max_replication_slots, nslots_on_old + 1); + if (nslots_on_old > max_replication_slots) pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of " "logical replication slots (%d) on the old cluster", @@ -2211,6 +2255,22 @@ check_old_cluster_for_valid_slots(void) "The slot \"%s\" has not consumed the WAL yet\n", slot->slotname); } + + /* + * The name "pg_conflict_detection" (defined as + * CONFLICT_DETECTION_SLOT) has been reserved for logical + * replication conflict detection slot since PG19. + */ + if (strcmp(slot->slotname, "pg_conflict_detection") == 0) + { + if (script == NULL && + (script = fopen_priv(output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + fprintf(script, + "The slot name \"%s\" is reserved\n", + slot->slotname); + } } } diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4b7a56f5b3b..a437067cdca 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -752,20 +752,33 @@ count_old_cluster_logical_slots(void) } /* - * get_subscription_count() + * get_subscription_info() * - * Gets the number of subscriptions in the cluster. + * Gets the information of subscriptions in the cluster. */ void -get_subscription_count(ClusterInfo *cluster) +get_subscription_info(ClusterInfo *cluster) { PGconn *conn; PGresult *res; + int i_nsub; + int i_retain_dead_tuples; conn = connectToServer(cluster, "template1"); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription"); - cluster->nsubs = atoi(PQgetvalue(res, 0, 0)); + if (GET_MAJOR_VERSION(cluster->major_version) >= 1900) + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "COUNT(CASE WHEN subretaindeadtuples THEN 1 END) > 0 AS retain_dead_tuples " + "FROM pg_catalog.pg_subscription"); + else + res = executeQueryOrDie(conn, "SELECT count(*) AS nsub," + "'f' AS retain_dead_tuples " + "FROM pg_catalog.pg_subscription"); + + i_nsub = PQfnumber(res, "nsub"); + i_retain_dead_tuples = PQfnumber(res, "retain_dead_tuples"); + + cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub)); + cluster->sub_retain_dead_tuples = (strcmp(PQgetvalue(res, 0, i_retain_dead_tuples), "t") == 0); PQclear(res); PQfinish(conn); diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 536e49d2616..d5cd5bf0b3a 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -67,6 +67,7 @@ static void set_frozenxids(bool minmxid_only); static void make_outputdirs(char *pgdata); static void setup(char *argv0); static void create_logical_replication_slots(void); +static void create_conflict_detection_slot(void); ClusterInfo old_cluster, new_cluster; @@ -88,6 +89,7 @@ int main(int argc, char **argv) { char *deletion_script_file_name = NULL; + bool migrate_logical_slots; /* * pg_upgrade doesn't currently use common/logging.c, but initialize it @@ -198,18 +200,39 @@ main(int argc, char **argv) new_cluster.pgdata); check_ok(); + migrate_logical_slots = count_old_cluster_logical_slots(); + /* - * Migrate the logical slots to the new cluster. Note that we need to do - * this after resetting WAL because otherwise the required WAL would be - * removed and slots would become unusable. There is a possibility that - * background processes might generate some WAL before we could create the - * slots in the new cluster but we can ignore that WAL as that won't be - * required downstream. + * Migrate replication slots to the new cluster. + * + * Note that we must migrate logical slots after resetting WAL because + * otherwise the required WAL would be removed and slots would become + * unusable. There is a possibility that background processes might + * generate some WAL before we could create the slots in the new cluster + * but we can ignore that WAL as that won't be required downstream. + * + * The conflict detection slot is not affected by concerns related to WALs + * as it only retains the dead tuples. It is created here for consistency. + * Note that the new conflict detection slot uses the latest transaction + * ID as xmin, so it cannot protect dead tuples that existed before the + * upgrade. Additionally, commit timestamps and origin data are not + * preserved during the upgrade. So, even after creating the slot, the + * upgraded subscriber may be unable to detect conflicts or log relevant + * commit timestamps and origins when applying changes from the publisher + * occurred before the upgrade especially if those changes were not + * replicated. It can only protect tuples that might be deleted after the + * new cluster starts. */ - if (count_old_cluster_logical_slots()) + if (migrate_logical_slots || old_cluster.sub_retain_dead_tuples) { start_postmaster(&new_cluster, true); - create_logical_replication_slots(); + + if (migrate_logical_slots) + create_logical_replication_slots(); + + if (old_cluster.sub_retain_dead_tuples) + create_conflict_detection_slot(); + stop_postmaster(false); } @@ -1025,3 +1048,24 @@ create_logical_replication_slots(void) return; } + +/* + * create_conflict_detection_slot() + * + * Create a replication slot to retain information necessary for conflict + * detection such as dead tuples, commit timestamps, and origins, for migrated + * subscriptions with retain_dead_tuples enabled. + */ +static void +create_conflict_detection_slot(void) +{ + PGconn *conn_new_template1; + + prep_status("Creating the replication conflict detection slot"); + + conn_new_template1 = connectToServer(&new_cluster, "template1"); + PQclear(executeQueryOrDie(conn_new_template1, "SELECT pg_catalog.binary_upgrade_create_conflict_detection_slot()")); + PQfinish(conn_new_template1); + + check_ok(); +} diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 69c965bb7d0..e9401430e69 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -302,6 +302,8 @@ typedef struct uint32 bin_version; /* version returned from pg_ctl */ const char *tablespace_suffix; /* directory specification */ int nsubs; /* number of subscriptions */ + bool sub_retain_dead_tuples; /* whether a subscription enables + * retain_dead_tuples. */ } ClusterInfo; @@ -441,7 +443,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db, const char *new_pgdata); void get_db_rel_and_slot_infos(ClusterInfo *cluster); int count_old_cluster_logical_slots(void); -void get_subscription_count(ClusterInfo *cluster); +void get_subscription_info(ClusterInfo *cluster); /* option.c */ diff --git a/src/bin/pg_upgrade/t/004_subscription.pl b/src/bin/pg_upgrade/t/004_subscription.pl index e46f02c6cc6..77387be0f9d 100644 --- a/src/bin/pg_upgrade/t/004_subscription.pl +++ b/src/bin/pg_upgrade/t/004_subscription.pl @@ -22,13 +22,13 @@ $publisher->start; # Initialize the old subscriber node my $old_sub = PostgreSQL::Test::Cluster->new('old_sub'); -$old_sub->init; +$old_sub->init(allows_streaming => 'physical'); $old_sub->start; my $oldbindir = $old_sub->config_data('--bindir'); # Initialize the new subscriber my $new_sub = PostgreSQL::Test::Cluster->new('new_sub'); -$new_sub->init; +$new_sub->init(allows_streaming => 'physical'); my $newbindir = $new_sub->config_data('--bindir'); # In a VPATH build, we'll be started in the source directory, but we want @@ -90,6 +90,54 @@ $old_sub->start; $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); # ------------------------------------------------------ +# Check that pg_upgrade fails when max_replication_slots configured in the new +# cluster is less than the number of logical slots in the old cluster + 1 when +# subscription's retain_dead_tuples option is enabled. +# ------------------------------------------------------ +# It is sufficient to use disabled subscription to test upgrade failure. + +$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1"); +$old_sub->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_dead_tuples = true)" +); + +$old_sub->stop; + +$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 0'); + +# pg_upgrade will fail because the new cluster has insufficient +# max_replication_slots. +command_checks_all( + [ + 'pg_upgrade', + '--no-sync', + '--old-datadir' => $old_sub->data_dir, + '--new-datadir' => $new_sub->data_dir, + '--old-bindir' => $oldbindir, + '--new-bindir' => $newbindir, + '--socketdir' => $new_sub->host, + '--old-port' => $old_sub->port, + '--new-port' => $new_sub->port, + $mode, + '--check', + ], + 1, + [ + qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/ + ], + [qr//], + 'run of pg_upgrade where the new cluster has insufficient max_replication_slots' +); + +# Reset max_replication_slots +$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 10'); + +# Cleanup +$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1"); +$old_sub->start; +$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;"); + +# ------------------------------------------------------ # Check that pg_upgrade refuses to run if: # a) there's a subscription with tables in a state other than 'r' (ready) or # 'i' (init) and/or @@ -200,8 +248,9 @@ $old_sub->safe_psql( rmtree($new_sub->data_dir . "/pg_upgrade_output.d"); # Verify that the upgrade should be successful with tables in 'ready'/'init' -# state along with retaining the replication origin's remote lsn, subscription's -# running status, and failover option. +# state along with retaining the replication origin's remote lsn, +# subscription's running status, failover option, and retain_dead_tuples +# option. $publisher->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); @@ -211,7 +260,7 @@ $publisher->safe_psql( $old_sub->safe_psql( 'postgres', qq[ CREATE TABLE tab_upgraded1(id int); - CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true); + CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true, retain_dead_tuples = true); ]); # Wait till the table tab_upgraded1 reaches 'ready' state @@ -270,7 +319,8 @@ $new_sub->append_conf('postgresql.conf', # Check that pg_upgrade is successful when all tables are in ready or in # init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is # in init state) along with retaining the replication origin's remote lsn, -# subscription's running status, and failover option. +# subscription's running status, failover option, and retain_dead_tuples +# option. # ------------------------------------------------------ command_ok( [ @@ -293,7 +343,8 @@ ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d", # ------------------------------------------------------ # Check that the data inserted to the publisher when the new subscriber is down # will be replicated once it is started. Also check that the old subscription -# states and relations origins are all preserved. +# states and relations origins are all preserved, and that the conflict +# detection slot is created. # ------------------------------------------------------ $publisher->safe_psql( 'postgres', qq[ @@ -303,15 +354,16 @@ $publisher->safe_psql( $new_sub->start; -# The subscription's running status and failover option should be preserved -# in the upgraded instance. So regress_sub4 should still have subenabled and -# subfailover set to true, while regress_sub5 should have both set to false. +# The subscription's running status, failover option, and retain_dead_tuples +# option should be preserved in the upgraded instance. So regress_sub4 should +# still have subenabled, subfailover, and subretaindeadtuples set to true, +# while regress_sub5 should have both set to false. $result = $new_sub->safe_psql('postgres', - "SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname" + "SELECT subname, subenabled, subfailover, subretaindeadtuples FROM pg_subscription ORDER BY subname" ); -is( $result, qq(regress_sub4|t|t -regress_sub5|f|f), - "check that the subscription's running status and failover are preserved" +is( $result, qq(regress_sub4|t|t|t +regress_sub5|f|f|f), + "check that the subscription's running status, failover, and retain_dead_tuples are preserved" ); # Subscription relations should be preserved @@ -330,6 +382,11 @@ $result = $new_sub->safe_psql('postgres', ); is($result, qq($remote_lsn), "remote_lsn should have been preserved"); +# The conflict detection slot should be created +$result = $new_sub->safe_psql('postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"); +is($result, qq(t), "conflict detection slot exists"); + # Resume the initial sync and wait until all tables of subscription # 'regress_sub5' are synchronized $new_sub->append_conf('postgresql.conf', diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index dd25d2fe7b8..7a06af48842 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6746,7 +6746,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false}; + false, false}; if (pset.sversion < 100000) { @@ -6814,6 +6814,10 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", subfailover AS \"%s\"\n", gettext_noop("Failover")); + if (pset.sversion >= 190000) + appendPQExpBuffer(&buf, + ", subretaindeadtuples AS \"%s\"\n", + gettext_noop("Retain dead tuples")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 37524364290..dbc586c5bc3 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2319,8 +2319,9 @@ match_previous_words(int pattern_id, /* ALTER SUBSCRIPTION <name> SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_dead_tuples", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION <name> SKIP ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3774,8 +3775,9 @@ match_previous_words(int pattern_id, else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "failover", "origin", - "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "password_required", "retain_dead_tuples", + "run_as_owner", "slot_name", "streaming", + "synchronous_commit", "two_phase"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 52916bab7a3..70949de56ac 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -293,7 +293,7 @@ typedef struct IndexAmRoutine ambuild_function ambuild; ambuildempty_function ambuildempty; aminsert_function aminsert; - aminsertcleanup_function aminsertcleanup; + aminsertcleanup_function aminsertcleanup; /* can be NULL */ ambulkdelete_function ambulkdelete; amvacuumcleanup_function amvacuumcleanup; amcanreturn_function amcanreturn; /* can be NULL */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 2cf8d55d706..cc06fc29ab2 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -316,16 +316,6 @@ typedef struct XLogRecData uint32 len; /* length of rmgr data to include */ } XLogRecData; -/* - * Recovery target action. - */ -typedef enum -{ - RECOVERY_TARGET_ACTION_PAUSE, - RECOVERY_TARGET_ACTION_PROMOTE, - RECOVERY_TARGET_ACTION_SHUTDOWN, -} RecoveryTargetAction; - struct LogicalDecodingContext; struct XLogRecordBuffer; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 91446303024..8e475e266d1 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -40,6 +40,16 @@ typedef enum RECOVERY_TARGET_TIMELINE_NUMERIC, } RecoveryTargetTimeLineGoal; +/* + * Recovery target action. + */ +typedef enum +{ + RECOVERY_TARGET_ACTION_PAUSE, + RECOVERY_TARGET_ACTION_PROMOTE, + RECOVERY_TARGET_ACTION_SHUTDOWN, +} RecoveryTargetAction; + /* Recovery pause states */ typedef enum RecoveryPauseState { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index a3f3315fed9..5173d422d46 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202507091 +#define CATALOG_VERSION_NO 202507231 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 1fc19146f46..3ee8fed7e53 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11801,6 +11801,10 @@ proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'text pg_lsn', prosrc => 'binary_upgrade_replorigin_advance' }, +{ oid => '9159', descr => 'for use by pg_upgrade (conflict detection slot)', + proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f', + provolatile => 'v', proparallel => 'u', prorettype => 'void', + proargtypes => '', prosrc => 'binary_upgrade_create_conflict_detection_slot' }, # conversion functions { oid => '4302', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 20fc329992d..231ef84ec9a 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -78,6 +78,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool subretaindeadtuples; /* True if dead tuples useful for + * conflict detection are retained */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +134,8 @@ typedef struct Subscription * (i.e. the main slot and the table sync * slots) in the upstream database are enabled * to be synchronized to the standbys. */ + bool retaindeadtuples; /* True if dead tuples useful for conflict + * detection are retained */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index c2262e46a7f..9b288ad22a6 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -28,4 +28,9 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel); + +extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled, + int elevel_for_sub_disabled); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/libpq/libpq-be-fe-helpers.h b/src/include/libpq/libpq-be-fe-helpers.h index 16205b824fa..af13bd6bf3d 100644 --- a/src/include/libpq/libpq-be-fe-helpers.h +++ b/src/include/libpq/libpq-be-fe-helpers.h @@ -454,4 +454,34 @@ exit: ; return error; } +/* + * libpqsrv_notice_receiver + * + * Custom notice receiver for libpq connections. + * + * This function is intended to be set via PQsetNoticeReceiver() so that + * NOTICE, WARNING, and similar messages from the connection are reported via + * ereport(), instead of being printed to stderr. + */ +static inline void +libpqsrv_notice_receiver(void *arg, const PGresult *res) +{ + char *message; + int len; + char *prefix = (char *) arg; + + /* + * Trim the trailing newline from the message text returned from + * PQresultErrorMessage(), as it always includes one, to produce cleaner + * log output. + */ + message = PQresultErrorMessage(res); + len = strlen(message); + if (len > 0 && message[len - 1] == '\n') + len--; + + ereport(LOG, + errmsg_internal("%s: %.*s", prefix, len, message)); +} + #endif /* LIBPQ_BE_FE_HELPERS_H */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 6567759595d..e5dd15098f6 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -179,6 +179,9 @@ typedef struct PlannerGlobal /* partition descriptors */ PartitionDirectory partition_directory pg_node_attr(read_write_ignore); + + /* hash table for NOT NULL attnums of relations */ + struct HTAB *rel_notnullatts_hash pg_node_attr(read_write_ignore); } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -719,6 +722,9 @@ typedef struct PartitionSchemeData *PartitionScheme; * the attribute is needed as part of final targetlist * attr_widths - cache space for per-attribute width estimates; * zero means not computed yet + * notnullattnums - zero-based set containing attnums of NOT NULL + * columns (not populated for rels corresponding to + * non-partitioned inh==true RTEs) * nulling_relids - relids of outer joins that can null this rel * lateral_vars - lateral cross-references of rel, if any (list of * Vars and PlaceHolderVars) @@ -952,11 +958,7 @@ typedef struct RelOptInfo Relids *attr_needed pg_node_attr(read_write_ignore); /* array indexed [min_attr .. max_attr] */ int32 *attr_widths pg_node_attr(read_write_ignore); - - /* - * Zero-based set containing attnums of NOT NULL columns. Not populated - * for rels corresponding to non-partitioned inh==true RTEs. - */ + /* zero-based set containing attnums of NOT NULL columns */ Bitmapset *notnullattnums; /* relids of outer joins that can null this baserel */ Relids nulling_relids; diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 546828b54bd..37bc13c2cbd 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -154,6 +154,8 @@ extern Node *estimate_expression_value(PlannerInfo *root, Node *node); extern Expr *evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, Oid result_collation); +extern bool var_is_nonnullable(PlannerInfo *root, Var *var, bool use_rel_info); + extern List *expand_function_arguments(List *args, bool include_out_arguments, Oid result_type, struct HeapTupleData *func_tuple); diff --git a/src/include/optimizer/plancat.h b/src/include/optimizer/plancat.h index cd74e4b1e8b..d6f6f4ad2d7 100644 --- a/src/include/optimizer/plancat.h +++ b/src/include/optimizer/plancat.h @@ -28,6 +28,10 @@ extern PGDLLIMPORT get_relation_info_hook_type get_relation_info_hook; extern void get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent, RelOptInfo *rel); +extern void get_relation_notnullatts(PlannerInfo *root, Relation relation); + +extern Relids find_relation_notnullatts(PlannerInfo *root, Oid relid); + extern List *infer_arbiter_indexes(PlannerInfo *root); extern void estimate_rel_size(Relation rel, int32 *attr_widths, diff --git a/src/include/port/solaris.h b/src/include/port/solaris.h index e63a3bd824d..8ff40007c7f 100644 --- a/src/include/port/solaris.h +++ b/src/include/port/solaris.h @@ -24,3 +24,12 @@ #if defined(__i386__) #include <sys/isa_defs.h> #endif + +/* + * On original Solaris, PAM conversation procs lack a "const" in their + * declaration; but recent OpenIndiana versions put it there by default. + * The least messy way to deal with this is to define _PAM_LEGACY_NONCONST, + * which causes OpenIndiana to declare pam_conv per the Solaris tradition, + * and also use that symbol to control omitting the "const" in our own code. + */ +#define _PAM_LEGACY_NONCONST 1 diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..b29453e8e4f 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -25,8 +25,11 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); +extern void CreateConflictDetectionSlot(void); + extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 19b4e8b6a03..e8fc342d1a9 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -21,6 +21,13 @@ #define PG_REPLSLOT_DIR "pg_replslot" /* + * The reserved name for a replication slot used to retain dead tuples for + * conflict detection in logical replication. See + * maybe_advance_nonremovable_xid() for detail. + */ +#define CONFLICT_DETECTION_SLOT "pg_conflict_detection" + +/* * Behaviour of replication slots, upon release or crash. * * Slots marked as PERSISTENT are crash-safe and will not be dropped when @@ -311,7 +318,9 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern void ReplicationSlotInitialize(void); -extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern bool ReplicationSlotValidateName(const char *name, + bool allow_reserved_name, + int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..0c7b8440a61 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,16 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * The changes made by this and later transactions must be retained to + * ensure reliable conflict detection during the apply phase. + * + * The logical replication launcher manages an internal replication slot + * named "pg_conflict_detection". It asynchronously collects this ID to + * decide when to advance the xmin value of the slot. + */ + TransactionId oldest_nonremovable_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -245,7 +255,8 @@ extern List *logicalrep_workers_find(Oid subid, bool only_running, extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, - dsm_handle subworker_dsm); + dsm_handle subworker_dsm, + bool retain_dead_tuples); extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index e7a0a234b6c..2933eea0649 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -201,7 +201,7 @@ typedef enum PgAioHandleCallbackID } PgAioHandleCallbackID; #define PGAIO_HCB_MAX PGAIO_HCB_LOCAL_BUFFER_READV -StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS), +StaticAssertDecl(PGAIO_HCB_MAX < (1 << PGAIO_RESULT_ID_BITS), "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS"); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 08a72569ae5..5e717765764 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -176,51 +176,23 @@ extern void LWLockInitialize(LWLock *lock, int tranche_id); * Every tranche ID less than NUM_INDIVIDUAL_LWLOCKS is reserved; also, * we reserve additional tranche IDs for builtin tranches not included in * the set of individual LWLocks. A call to LWLockNewTrancheId will never - * return a value less than LWTRANCHE_FIRST_USER_DEFINED. + * return a value less than LWTRANCHE_FIRST_USER_DEFINED. The actual list of + * built-in tranches is kept in lwlocklist.h. */ typedef enum BuiltinTrancheIds { - LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS, - LWTRANCHE_COMMITTS_BUFFER, - LWTRANCHE_SUBTRANS_BUFFER, - LWTRANCHE_MULTIXACTOFFSET_BUFFER, - LWTRANCHE_MULTIXACTMEMBER_BUFFER, - LWTRANCHE_NOTIFY_BUFFER, - LWTRANCHE_SERIAL_BUFFER, - LWTRANCHE_WAL_INSERT, - LWTRANCHE_BUFFER_CONTENT, - LWTRANCHE_REPLICATION_ORIGIN_STATE, - LWTRANCHE_REPLICATION_SLOT_IO, - LWTRANCHE_LOCK_FASTPATH, - LWTRANCHE_BUFFER_MAPPING, - LWTRANCHE_LOCK_MANAGER, - LWTRANCHE_PREDICATE_LOCK_MANAGER, - LWTRANCHE_PARALLEL_HASH_JOIN, - LWTRANCHE_PARALLEL_BTREE_SCAN, - LWTRANCHE_PARALLEL_QUERY_DSA, - LWTRANCHE_PER_SESSION_DSA, - LWTRANCHE_PER_SESSION_RECORD_TYPE, - LWTRANCHE_PER_SESSION_RECORD_TYPMOD, - LWTRANCHE_SHARED_TUPLESTORE, - LWTRANCHE_SHARED_TIDBITMAP, - LWTRANCHE_PARALLEL_APPEND, - LWTRANCHE_PER_XACT_PREDICATE_LIST, - LWTRANCHE_PGSTATS_DSA, - LWTRANCHE_PGSTATS_HASH, - LWTRANCHE_PGSTATS_DATA, - LWTRANCHE_LAUNCHER_DSA, - LWTRANCHE_LAUNCHER_HASH, - LWTRANCHE_DSM_REGISTRY_DSA, - LWTRANCHE_DSM_REGISTRY_HASH, - LWTRANCHE_COMMITTS_SLRU, - LWTRANCHE_MULTIXACTMEMBER_SLRU, - LWTRANCHE_MULTIXACTOFFSET_SLRU, - LWTRANCHE_NOTIFY_SLRU, - LWTRANCHE_SERIAL_SLRU, - LWTRANCHE_SUBTRANS_SLRU, - LWTRANCHE_XACT_SLRU, - LWTRANCHE_PARALLEL_VACUUM_DSA, - LWTRANCHE_AIO_URING_COMPLETION, + /* + * LWTRANCHE_INVALID is an unused value that only exists to initialize the + * rest of the tranches to appropriate values. + */ + LWTRANCHE_INVALID = NUM_INDIVIDUAL_LWLOCKS - 1, + +#define PG_LWLOCK(id, name) +#define PG_LWLOCKTRANCHE(id, name) LWTRANCHE_##id, +#include "storage/lwlocklist.h" +#undef PG_LWLOCK +#undef PG_LWLOCKTRANCHE + LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..208d2e3a8ed 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -2,9 +2,10 @@ * * lwlocklist.h * - * The predefined LWLock list is kept in its own source file for use by - * automatic tools. The exact representation of a keyword is determined by - * the PG_LWLOCK macro, which is not defined in this file; it can be + * The list of predefined LWLocks and built-in LWLock tranches is kept in + * its own source file for use by automatic tools. The exact + * representation of a keyword is determined by the PG_LWLOCK and + * PG_LWLOCKTRANCHE macros, which are not defined in this file; they can be * defined by the caller for special purposes. * * Also, generate-lwlocknames.pl processes this file to create lwlocknames.h. @@ -84,3 +85,53 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) + +/* + * There also exist several built-in LWLock tranches. As with the predefined + * LWLocks, be sure to update the WaitEventLWLock section of + * src/backend/utils/activity/wait_event_names.txt when modifying this list. + * + * Note that the IDs here (the first value) don't include the LWTRANCHE_ + * prefix. It's added elsewhere. + */ +PG_LWLOCKTRANCHE(XACT_BUFFER, XactBuffer) +PG_LWLOCKTRANCHE(COMMITTS_BUFFER, CommitTsBuffer) +PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer) +PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer) +PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer) +PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer) +PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer) +PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert) +PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent) +PG_LWLOCKTRANCHE(REPLICATION_ORIGIN_STATE, ReplicationOriginState) +PG_LWLOCKTRANCHE(REPLICATION_SLOT_IO, ReplicationSlotIO) +PG_LWLOCKTRANCHE(LOCK_FASTPATH, LockFastPath) +PG_LWLOCKTRANCHE(BUFFER_MAPPING, BufferMapping) +PG_LWLOCKTRANCHE(LOCK_MANAGER, LockManager) +PG_LWLOCKTRANCHE(PREDICATE_LOCK_MANAGER, PredicateLockManager) +PG_LWLOCKTRANCHE(PARALLEL_HASH_JOIN, ParallelHashJoin) +PG_LWLOCKTRANCHE(PARALLEL_BTREE_SCAN, ParallelBtreeScan) +PG_LWLOCKTRANCHE(PARALLEL_QUERY_DSA, ParallelQueryDSA) +PG_LWLOCKTRANCHE(PER_SESSION_DSA, PerSessionDSA) +PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPE, PerSessionRecordType) +PG_LWLOCKTRANCHE(PER_SESSION_RECORD_TYPMOD, PerSessionRecordTypmod) +PG_LWLOCKTRANCHE(SHARED_TUPLESTORE, SharedTupleStore) +PG_LWLOCKTRANCHE(SHARED_TIDBITMAP, SharedTidBitmap) +PG_LWLOCKTRANCHE(PARALLEL_APPEND, ParallelAppend) +PG_LWLOCKTRANCHE(PER_XACT_PREDICATE_LIST, PerXactPredicateList) +PG_LWLOCKTRANCHE(PGSTATS_DSA, PgStatsDSA) +PG_LWLOCKTRANCHE(PGSTATS_HASH, PgStatsHash) +PG_LWLOCKTRANCHE(PGSTATS_DATA, PgStatsData) +PG_LWLOCKTRANCHE(LAUNCHER_DSA, LogicalRepLauncherDSA) +PG_LWLOCKTRANCHE(LAUNCHER_HASH, LogicalRepLauncherHash) +PG_LWLOCKTRANCHE(DSM_REGISTRY_DSA, DSMRegistryDSA) +PG_LWLOCKTRANCHE(DSM_REGISTRY_HASH, DSMRegistryHash) +PG_LWLOCKTRANCHE(COMMITTS_SLRU, CommitTsSLRU) +PG_LWLOCKTRANCHE(MULTIXACTOFFSET_SLRU, MultiXactOffsetSLRU) +PG_LWLOCKTRANCHE(MULTIXACTMEMBER_SLRU, MultiXactMemberSLRU) +PG_LWLOCKTRANCHE(NOTIFY_SLRU, NotifySLRU) +PG_LWLOCKTRANCHE(SERIAL_SLRU, SerialSLRU) +PG_LWLOCKTRANCHE(SUBTRANS_SLRU, SubtransSLRU) +PG_LWLOCKTRANCHE(XACT_SLRU, XactSLRU) +PG_LWLOCKTRANCHE(PARALLEL_VACUUM_DSA, ParallelVacuumDSA) +PG_LWLOCKTRANCHE(AIO_URING_COMPLETION, AioUringCompletion) diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 9f9b3fcfbf1..c6f5ebceefd 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -130,9 +130,17 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend; * the checkpoint are actually destroyed on disk. Replay can cope with a file * or block that doesn't exist, but not with a block that has the wrong * contents. + * + * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but + * it explicitly indicates that the reason for delaying the checkpoint is due + * to a transaction being within a critical commit section. We need this new + * flag to ensure all the transactions that have acquired commit timestamp are + * finished before we allow the logical replication client to advance its xid + * which is used to hold back dead rows for conflict detection. */ #define DELAY_CHKPT_START (1<<0) #define DELAY_CHKPT_COMPLETE (1<<1) +#define DELAY_CHKPT_IN_COMMIT (DELAY_CHKPT_START | 1<<2) typedef enum { diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index e4877d88e8f..2f4ae06c279 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -55,7 +55,8 @@ extern RunningTransactions GetRunningTransactionData(void); extern bool TransactionIdIsInProgress(TransactionId xid); extern TransactionId GetOldestNonRemovableTransactionId(Relation rel); extern TransactionId GetOldestTransactionIdConsideredRunning(void); -extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly, + bool allDbs); extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin); diff --git a/src/interfaces/ecpg/ecpglib/connect.c b/src/interfaces/ecpg/ecpglib/connect.c index 2bbb70333dc..78de9f298ba 100644 --- a/src/interfaces/ecpg/ecpglib/connect.c +++ b/src/interfaces/ecpg/ecpglib/connect.c @@ -58,7 +58,12 @@ ecpg_get_connection_nr(const char *connection_name) for (con = all_connections; con != NULL; con = con->next) { - if (strcmp(connection_name, con->name) == 0) + /* + * Check for the case of a NULL connection name, stored as such in + * the connection information by ECPGconnect() when the database + * name is not specified by its caller. + */ + if (con->name != NULL && strcmp(connection_name, con->name) == 0) break; } ret = con; @@ -259,7 +264,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p struct connection *this; int i, connect_params = 0; - char *dbname = name ? ecpg_strdup(name, lineno) : NULL, + bool alloc_failed = (sqlca == NULL); + char *dbname = name ? ecpg_strdup(name, lineno, &alloc_failed) : NULL, *host = NULL, *tmp, *port = NULL, @@ -268,11 +274,12 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p const char **conn_keywords; const char **conn_values; - if (sqlca == NULL) + if (alloc_failed) { ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); - ecpg_free(dbname); + if (dbname) + ecpg_free(dbname); return false; } @@ -297,7 +304,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p if (envname) { ecpg_free(dbname); - dbname = ecpg_strdup(envname, lineno); + dbname = ecpg_strdup(envname, lineno, &alloc_failed); } } @@ -349,7 +356,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname + offset, '?'); if (tmp != NULL) /* options given */ { - options = ecpg_strdup(tmp + 1, lineno); + options = ecpg_strdup(tmp + 1, lineno, &alloc_failed); *tmp = '\0'; } @@ -358,7 +365,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p { if (tmp[1] != '\0') /* non-empty database name */ { - realname = ecpg_strdup(tmp + 1, lineno); + realname = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; } *tmp = '\0'; @@ -368,7 +375,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p if (tmp != NULL) /* port number given */ { *tmp = '\0'; - port = ecpg_strdup(tmp + 1, lineno); + port = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; } @@ -402,7 +409,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p { if (*(dbname + offset) != '\0') { - host = ecpg_strdup(dbname + offset, lineno); + host = ecpg_strdup(dbname + offset, lineno, &alloc_failed); connect_params++; } } @@ -414,7 +421,7 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname, ':'); if (tmp != NULL) /* port number given */ { - port = ecpg_strdup(tmp + 1, lineno); + port = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; *tmp = '\0'; } @@ -422,14 +429,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p tmp = strrchr(dbname, '@'); if (tmp != NULL) /* host name given */ { - host = ecpg_strdup(tmp + 1, lineno); + host = ecpg_strdup(tmp + 1, lineno, &alloc_failed); connect_params++; *tmp = '\0'; } if (strlen(dbname) > 0) { - realname = ecpg_strdup(dbname, lineno); + realname = ecpg_strdup(dbname, lineno, &alloc_failed); connect_params++; } else @@ -460,7 +467,18 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p */ conn_keywords = (const char **) ecpg_alloc((connect_params + 1) * sizeof(char *), lineno); conn_values = (const char **) ecpg_alloc(connect_params * sizeof(char *), lineno); - if (conn_keywords == NULL || conn_values == NULL) + + /* Decide on a connection name */ + if (connection_name != NULL || realname != NULL) + { + this->name = ecpg_strdup(connection_name ? connection_name : realname, + lineno, &alloc_failed); + } + else + this->name = NULL; + + /* Deal with any failed allocations above */ + if (conn_keywords == NULL || conn_values == NULL || alloc_failed) { if (host) ecpg_free(host); @@ -476,6 +494,8 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p ecpg_free(conn_keywords); if (conn_values) ecpg_free(conn_values); + if (this->name) + ecpg_free(this->name); free(this); return false; } @@ -510,17 +530,14 @@ ECPGconnect(int lineno, int c, const char *name, const char *user, const char *p ecpg_free(conn_keywords); if (conn_values) ecpg_free(conn_values); + if (this->name) + ecpg_free(this->name); free(this); return false; } } #endif - if (connection_name != NULL) - this->name = ecpg_strdup(connection_name, lineno); - else - this->name = ecpg_strdup(realname, lineno); - this->cache_head = NULL; this->prep_stmts = NULL; diff --git a/src/interfaces/ecpg/ecpglib/descriptor.c b/src/interfaces/ecpg/ecpglib/descriptor.c index 651d5c8b2ed..466428edfeb 100644 --- a/src/interfaces/ecpg/ecpglib/descriptor.c +++ b/src/interfaces/ecpg/ecpglib/descriptor.c @@ -240,8 +240,9 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...) act_tuple; struct variable data_var; struct sqlca_t *sqlca = ECPGget_sqlca(); + bool alloc_failed = (sqlca == NULL); - if (sqlca == NULL) + if (alloc_failed) { ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); @@ -493,7 +494,14 @@ ECPGget_desc(int lineno, const char *desc_name, int index,...) #ifdef WIN32 stmt.oldthreadlocale = _configthreadlocale(_ENABLE_PER_THREAD_LOCALE); #endif - stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno); + stmt.oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), + lineno, &alloc_failed); + if (alloc_failed) + { + va_end(args); + return false; + } + setlocale(LC_NUMERIC, "C"); #endif diff --git a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h index 75cc68275bd..949ff66cefc 100644 --- a/src/interfaces/ecpg/ecpglib/ecpglib_extern.h +++ b/src/interfaces/ecpg/ecpglib/ecpglib_extern.h @@ -175,7 +175,7 @@ void ecpg_free(void *ptr); bool ecpg_init(const struct connection *con, const char *connection_name, const int lineno); -char *ecpg_strdup(const char *string, int lineno); +char *ecpg_strdup(const char *string, int lineno, bool *alloc_failed); const char *ecpg_type_name(enum ECPGttype typ); int ecpg_dynamic_type(Oid type); int sqlda_dynamic_type(Oid type, enum COMPAT_MODE compat); diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index f52da06de9a..84a4a9fc578 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -860,9 +860,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari numeric *nval; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -923,9 +923,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -970,9 +970,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -1017,9 +1017,9 @@ ecpg_store_input(const int lineno, const bool force_indicator, const struct vari int slen; if (var->arrsize > 1) - mallocedval = ecpg_strdup("{", lineno); + mallocedval = ecpg_strdup("{", lineno, NULL); else - mallocedval = ecpg_strdup("", lineno); + mallocedval = ecpg_strdup("", lineno, NULL); if (!mallocedval) return false; @@ -2001,7 +2001,8 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, return false; } #endif - stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno); + stmt->oldlocale = ecpg_strdup(setlocale(LC_NUMERIC, NULL), lineno, + NULL); if (stmt->oldlocale == NULL) { ecpg_do_epilogue(stmt); @@ -2030,7 +2031,14 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, statement_type = ECPGst_execute; } else - stmt->command = ecpg_strdup(query, lineno); + { + stmt->command = ecpg_strdup(query, lineno, NULL); + if (!stmt->command) + { + ecpg_do_epilogue(stmt); + return false; + } + } stmt->name = NULL; @@ -2042,7 +2050,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, if (command) { stmt->name = stmt->command; - stmt->command = ecpg_strdup(command, lineno); + stmt->command = ecpg_strdup(command, lineno, NULL); + if (!stmt->command) + { + ecpg_do_epilogue(stmt); + return false; + } } else { @@ -2175,7 +2188,12 @@ ecpg_do_prologue(int lineno, const int compat, const int force_indicator, if (!is_prepared_name_set && stmt->statement_type == ECPGst_prepare) { - stmt->name = ecpg_strdup(var->value, lineno); + stmt->name = ecpg_strdup(var->value, lineno, NULL); + if (!stmt->name) + { + ecpg_do_epilogue(stmt); + return false; + } is_prepared_name_set = true; } } diff --git a/src/interfaces/ecpg/ecpglib/memory.c b/src/interfaces/ecpg/ecpglib/memory.c index 6979be2c988..2112e55b6e4 100644 --- a/src/interfaces/ecpg/ecpglib/memory.c +++ b/src/interfaces/ecpg/ecpglib/memory.c @@ -43,8 +43,15 @@ ecpg_realloc(void *ptr, long size, int lineno) return new; } +/* + * Wrapper for strdup(), with NULL in input treated as a correct case. + * + * "alloc_failed" can be optionally specified by the caller to check for + * allocation failures. The caller is responsible for its initialization, + * as ecpg_strdup() may be called repeatedly across multiple allocations. + */ char * -ecpg_strdup(const char *string, int lineno) +ecpg_strdup(const char *string, int lineno, bool *alloc_failed) { char *new; @@ -54,6 +61,8 @@ ecpg_strdup(const char *string, int lineno) new = strdup(string); if (!new) { + if (alloc_failed) + *alloc_failed = true; ecpg_raise(lineno, ECPG_OUT_OF_MEMORY, ECPG_SQLSTATE_ECPG_OUT_OF_MEMORY, NULL); return NULL; } diff --git a/src/interfaces/ecpg/ecpglib/prepare.c b/src/interfaces/ecpg/ecpglib/prepare.c index ea1146f520f..dd6fd1fe7f4 100644 --- a/src/interfaces/ecpg/ecpglib/prepare.c +++ b/src/interfaces/ecpg/ecpglib/prepare.c @@ -85,9 +85,22 @@ ecpg_register_prepared_stmt(struct statement *stmt) /* create statement */ prep_stmt->lineno = lineno; prep_stmt->connection = con; - prep_stmt->command = ecpg_strdup(stmt->command, lineno); + prep_stmt->command = ecpg_strdup(stmt->command, lineno, NULL); + if (!prep_stmt->command) + { + ecpg_free(prep_stmt); + ecpg_free(this); + return false; + } prep_stmt->inlist = prep_stmt->outlist = NULL; - this->name = ecpg_strdup(stmt->name, lineno); + this->name = ecpg_strdup(stmt->name, lineno, NULL); + if (!this->name) + { + ecpg_free(prep_stmt->command); + ecpg_free(prep_stmt); + ecpg_free(this); + return false; + } this->stmt = prep_stmt; this->prepared = true; @@ -177,14 +190,27 @@ prepare_common(int lineno, struct connection *con, const char *name, const char /* create statement */ stmt->lineno = lineno; stmt->connection = con; - stmt->command = ecpg_strdup(variable, lineno); + stmt->command = ecpg_strdup(variable, lineno, NULL); + if (!stmt->command) + { + ecpg_free(stmt); + ecpg_free(this); + return false; + } stmt->inlist = stmt->outlist = NULL; /* if we have C variables in our statement replace them with '?' */ replace_variables(&(stmt->command), lineno); /* add prepared statement to our list */ - this->name = ecpg_strdup(name, lineno); + this->name = ecpg_strdup(name, lineno, NULL); + if (!this->name) + { + ecpg_free(stmt->command); + ecpg_free(stmt); + ecpg_free(this); + return false; + } this->stmt = stmt; /* and finally really prepare the statement */ @@ -540,7 +566,9 @@ AddStmtToCache(int lineno, /* line # of statement */ /* add the query to the entry */ entry = &stmtCacheEntries[entNo]; entry->lineno = lineno; - entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno); + entry->ecpgQuery = ecpg_strdup(ecpgQuery, lineno, NULL); + if (!entry->ecpgQuery) + return -1; entry->connection = connection; entry->execs = 0; memcpy(entry->stmtID, stmtID, sizeof(entry->stmtID)); @@ -567,6 +595,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha ecpg_log("ecpg_auto_prepare on line %d: statement found in cache; entry %d\n", lineno, entNo); stmtID = stmtCacheEntries[entNo].stmtID; + *name = ecpg_strdup(stmtID, lineno, NULL); + if (*name == NULL) + return false; con = ecpg_get_connection(connection_name); prep = ecpg_find_prepared_statement(stmtID, con, NULL); @@ -574,7 +605,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha if (!prep && !prepare_common(lineno, con, stmtID, query)) return false; - *name = ecpg_strdup(stmtID, lineno); } else { @@ -584,6 +614,9 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha /* generate a statement ID */ sprintf(stmtID, "ecpg%d", nextStmtID++); + *name = ecpg_strdup(stmtID, lineno, NULL); + if (*name == NULL) + return false; if (!ECPGprepare(lineno, connection_name, 0, stmtID, query)) return false; @@ -591,8 +624,6 @@ ecpg_auto_prepare(int lineno, const char *connection_name, const int compat, cha entNo = AddStmtToCache(lineno, stmtID, connection_name, compat, query); if (entNo < 0) return false; - - *name = ecpg_strdup(stmtID, lineno); } /* increase usage counter */ diff --git a/src/port/pgmkdirp.c b/src/port/pgmkdirp.c index d943559760d..7d7cea4dd0e 100644 --- a/src/port/pgmkdirp.c +++ b/src/port/pgmkdirp.c @@ -73,7 +73,7 @@ pg_mkdir_p(char *path, int omode) if (p[0] == '/' && p[1] == '/') { /* network drive */ - p = strstr(p + 2, "/"); + p = strchr(p + 2, '/'); if (p == NULL) { errno = EINVAL; diff --git a/src/test/regress/expected/generated_virtual.out b/src/test/regress/expected/generated_virtual.out index a635cb1e776..aca6347babe 100644 --- a/src/test/regress/expected/generated_virtual.out +++ b/src/test/regress/expected/generated_virtual.out @@ -1550,11 +1550,11 @@ where coalesce(t2.b, 1) = 2; explain (costs off) select t1.a from gtest32 t1 left join gtest32 t2 on t1.a = t2.a where coalesce(t2.b, 1) = 2 or t1.a is null; - QUERY PLAN -------------------------------------------------------------- + QUERY PLAN +----------------------------------------- Hash Left Join Hash Cond: (t1.a = t2.a) - Filter: ((COALESCE((t2.a * 2), 1) = 2) OR (t1.a IS NULL)) + Filter: (COALESCE((t2.a * 2), 1) = 2) -> Seq Scan on gtest32 t1 -> Hash -> Seq Scan on gtest32 t2 diff --git a/src/test/regress/expected/join.out b/src/test/regress/expected/join.out index 46ddfa844c5..4d5d35d0727 100644 --- a/src/test/regress/expected/join.out +++ b/src/test/regress/expected/join.out @@ -3639,8 +3639,8 @@ from nt3 as nt3 ) as ss2 on ss2.id = nt3.nt2_id where nt3.id = 1 and ss2.b3; - QUERY PLAN ------------------------------------------------ + QUERY PLAN +---------------------------------------------- Nested Loop -> Nested Loop -> Index Scan using nt3_pkey on nt3 @@ -3649,7 +3649,7 @@ where nt3.id = 1 and ss2.b3; Index Cond: (id = nt3.nt2_id) -> Index Only Scan using nt1_pkey on nt1 Index Cond: (id = nt2.nt1_id) - Filter: (nt2.b1 AND (id IS NOT NULL)) + Filter: (nt2.b1 AND true) (9 rows) select nt3.id diff --git a/src/test/regress/expected/predicate.out b/src/test/regress/expected/predicate.out index b79037748b7..59bfe33bb1c 100644 --- a/src/test/regress/expected/predicate.out +++ b/src/test/regress/expected/predicate.out @@ -84,10 +84,10 @@ SELECT * FROM pred_tab t WHERE t.a IS NULL OR t.c IS NULL; -- are provably false EXPLAIN (COSTS OFF) SELECT * FROM pred_tab t WHERE t.b IS NULL OR t.c IS NULL; - QUERY PLAN ----------------------------------------- + QUERY PLAN +------------------------ Seq Scan on pred_tab t - Filter: ((b IS NULL) OR (c IS NULL)) + Filter: (b IS NULL) (2 rows) -- @@ -231,6 +231,54 @@ SELECT * FROM pred_tab t1 -> Seq Scan on pred_tab t3 (9 rows) +-- +-- Tests for NullTest reduction in EXISTS sublink +-- +-- Ensure the IS_NOT_NULL qual is ignored +EXPLAIN (COSTS OFF) +SELECT * FROM pred_tab t1 + LEFT JOIN pred_tab t2 ON EXISTS + (SELECT 1 FROM pred_tab t3, pred_tab t4, pred_tab t5, pred_tab t6 + WHERE t1.a = t3.a AND t6.a IS NOT NULL); + QUERY PLAN +--------------------------------------------------------- + Nested Loop Left Join + Join Filter: EXISTS(SubPlan 1) + -> Seq Scan on pred_tab t1 + -> Materialize + -> Seq Scan on pred_tab t2 + SubPlan 1 + -> Nested Loop + -> Nested Loop + -> Nested Loop + -> Seq Scan on pred_tab t4 + -> Materialize + -> Seq Scan on pred_tab t3 + Filter: (t1.a = a) + -> Materialize + -> Seq Scan on pred_tab t5 + -> Materialize + -> Seq Scan on pred_tab t6 +(17 rows) + +-- Ensure the IS_NULL qual is reduced to constant-FALSE +EXPLAIN (COSTS OFF) +SELECT * FROM pred_tab t1 + LEFT JOIN pred_tab t2 ON EXISTS + (SELECT 1 FROM pred_tab t3, pred_tab t4, pred_tab t5, pred_tab t6 + WHERE t1.a = t3.a AND t6.a IS NULL); + QUERY PLAN +------------------------------------- + Nested Loop Left Join + Join Filter: (InitPlan 1).col1 + InitPlan 1 + -> Result + One-Time Filter: false + -> Seq Scan on pred_tab t1 + -> Materialize + -> Seq Scan on pred_tab t2 +(8 rows) + DROP TABLE pred_tab; -- Validate we handle IS NULL and IS NOT NULL quals correctly with inheritance -- parents. diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 529b2241731..a98c97f7616 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00012345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist2 | 0/00000000 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | local | dbname=regress_doesnotexist2 | 0/00000000 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -409,18 +409,34 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------ - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/00000000 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- fail - retain_dead_tuples must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo); +ERROR: retain_dead_tuples requires a Boolean value +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------ + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | off | dbname=regress_doesnotexist | 0/00000000 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/predicate.sql b/src/test/regress/sql/predicate.sql index 9dcb81b1bc5..d92277353a0 100644 --- a/src/test/regress/sql/predicate.sql +++ b/src/test/regress/sql/predicate.sql @@ -115,6 +115,24 @@ SELECT * FROM pred_tab t1 LEFT JOIN pred_tab t2 ON t1.a = 1 LEFT JOIN pred_tab t3 ON t2.a IS NULL OR t2.c IS NULL; +-- +-- Tests for NullTest reduction in EXISTS sublink +-- + +-- Ensure the IS_NOT_NULL qual is ignored +EXPLAIN (COSTS OFF) +SELECT * FROM pred_tab t1 + LEFT JOIN pred_tab t2 ON EXISTS + (SELECT 1 FROM pred_tab t3, pred_tab t4, pred_tab t5, pred_tab t6 + WHERE t1.a = t3.a AND t6.a IS NOT NULL); + +-- Ensure the IS_NULL qual is reduced to constant-FALSE +EXPLAIN (COSTS OFF) +SELECT * FROM pred_tab t1 + LEFT JOIN pred_tab t2 ON EXISTS + (SELECT 1 FROM pred_tab t3, pred_tab t4, pred_tab t5, pred_tab t6 + WHERE t1.a = t3.a AND t6.a IS NULL); + DROP TABLE pred_tab; -- Validate we handle IS NULL and IS NOT NULL quals correctly with inheritance diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 007c9e70374..f0f714fe747 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -287,6 +287,17 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - retain_dead_tuples must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo); + +-- ok +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index d78a6bac16a..7458d7fba7e 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -1,6 +1,6 @@ # Copyright (c) 2025, PostgreSQL Global Development Group -# Test the conflict detection of conflict type 'multiple_unique_conflicts'. +# Test conflicts in logical replication use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -18,7 +18,7 @@ $node_publisher->start; # Create a subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init; +$node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; # Create a table on publisher @@ -146,4 +146,195 @@ $node_subscriber->wait_for_log( pass('multiple_unique_conflicts detected on a leaf partition during insert'); +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes. + +# node_A. Increase the log_min_messages setting to DEBUG2 to debug test +# failures. Disable autovacuum to avoid generating xid that could affect the +# replication slot's xmin value. +my $node_A = $node_publisher; +$node_A->append_conf( + 'postgresql.conf', + qq{autovacuum = off + log_min_messages = 'debug2'}); +$node_A->restart; + +# node_B +my $node_B = $node_subscriber; +$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on"); +$node_B->restart; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +my $subname_AB = 'tap_sub_a_b'; +my $subname_BA = 'tap_sub_b_a'; + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' + PUBLICATION tap_pub_A + WITH (origin = none, retain_dead_tuples = true)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off)"); + +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +is(1, 1, 'Bidirectional replication setup is complete'); + +# Confirm that the conflict detection slot is created on Node B and the xmin +# value is valid. +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); + +################################################## +# Check that the retain_dead_tuples option can be enabled only for disabled +# subscriptions. Validate the NOTICE message during the subscription DDL, and +# ensure the conflict detection slot is created upon enabling the +# retain_dead_tuples option. +################################################## + +# Alter retain_dead_tuples for enabled subscription +my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true)"); +ok( $stderr =~ + /ERROR: cannot set option \"retain_dead_tuples\" for enabled subscription/, + "altering retain_dead_tuples is not allowed for enabled subscription"); + +# Disable the subscription +$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;"); + +# Enable retain_dead_tuples for disabled subscription +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);"); +ok( $stderr =~ + /NOTICE: deleted rows to detect conflicts would not be removed until the subscription is enabled/, + "altering retain_dead_tuples is allowed for disabled subscription"); + +# Re-enable the subscription +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); + +# Confirm that the conflict detection slot is created on Node A and the xmin +# value is valid. +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +################################################## +# Check the WARNING when changing the origin to ANY, if retain_dead_tuples is +# enabled. This warns of the possibility of receiving changes from origins +# other than the publisher. +################################################## + +($cmdret, $stdout, $stderr) = $node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = any);"); +ok( $stderr =~ + /WARNING: subscription "tap_sub_a_b" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins/, + "warn of the possibility of receiving changes from origins other than the publisher"); + +# Reset the origin to none +$node_A->psql('postgres', + "ALTER SUBSCRIPTION $subname_AB SET (origin = none);"); + +############################################################################### +# Check that dead tuples on node A cannot be cleaned by VACUUM until the +# concurrent transactions on Node B have been applied and flushed on Node A. +############################################################################### + +# Insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); +$node_A->wait_for_catchup($subname_BA); + +my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is($result, qq(1|1 +2|2), 'check replicated insert on node B'); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); + +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 are dead but not yet removable/, + 'the deleted column is non-removable'); + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +# Remember the next transaction ID to be assigned +my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); + +# Confirm that the xmin value is advanced to the latest nextXid. If no +# transactions are running, the apply worker selects nextXid as the candidate +# for the non-removable xid. See GetOldestActiveTransactionId(). +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is updated on Node A"); + +# Confirm that the dead tuple can be removed now +($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 removed, 1 remain, 0 are dead but not yet removable/, + 'the deleted column is removed'); + +############################################################################### +# Check that the replication slot pg_conflict_detection is dropped after +# removing all the subscriptions. +############################################################################### + +$node_B->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_BA"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node B"); + +$node_A->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_AB"); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node A"); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ff050e93a50..a8656419cb6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1759,6 +1759,7 @@ NonEmptyRange Notification NotificationList NotifyStmt +NotnullHashEntry Nsrt NtDllRoutine NtFlushBuffersFileEx_t @@ -2565,6 +2566,8 @@ RestrictInfo Result ResultRelInfo ResultState +RetainDeadTuplesData +RetainDeadTuplesPhase ReturnSetInfo ReturnStmt ReturningClause |