Make walsenders show their replication commands in pg_stat_activity.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Sep 2020 16:35:00 +0000 (12:35 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Sep 2020 16:35:00 +0000 (12:35 -0400)
A walsender process that has executed a SQL command left the text of
that command in pg_stat_activity.query indefinitely, which is quite
confusing if it's in RUNNING state but not doing that query.  An easy
and useful fix is to treat replication commands as if they were SQL
queries, and show them in pg_stat_activity according to the same rules
as for regular queries.  While we're at it, it seems also sensible to
set debug_query_string, allowing error logging and debugging to see
the replication command.

While here, clean up assorted silliness in exec_replication_command:

* The SQLCmd path failed to restore CurrentMemoryContext to the caller's
value, and failed to delete the temp context created in this routine.
It's only through great good fortune that these oversights did not
result in long-term memory leaks or other problems.  It seems cleaner
to code SQLCmd as a separate early-exit path, so do it like that.

* Remove useless duplicate call of SnapBuildClearExportedSnapshot().

* replication_scanner_finish() was never called.

None of those things are significant enough to merit a backpatch,
so this is for HEAD only.

Discussion: https://postgr.es/m/880181.1600026471@sss.pgh.pa.us

src/backend/replication/walsender.c

index 3f756b470af11c9d467874a7544abe30e7b77ee0..67093383e66bfb4a335bb7101194ffdd666d3ee8 100644 (file)
@@ -1545,6 +1545,9 @@ exec_replication_command(const char *cmd_string)
 
    CHECK_FOR_INTERRUPTS();
 
+   /*
+    * Parse the command.
+    */
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_SIZES);
@@ -1557,31 +1560,47 @@ exec_replication_command(const char *cmd_string)
                (errcode(ERRCODE_SYNTAX_ERROR),
                 errmsg_internal("replication command parser returned %d",
                                 parse_rc)));
+   replication_scanner_finish();
 
    cmd_node = replication_parse_result;
 
    /*
-    * Log replication command if log_replication_commands is enabled. Even
-    * when it's disabled, log the command with DEBUG1 level for backward
-    * compatibility. Note that SQL commands are not logged here, and will be
-    * logged later if log_statement is enabled.
+    * If it's a SQL command, just clean up our mess and return false; the
+    * caller will take care of executing it.
     */
-   if (cmd_node->type != T_SQLCmd)
-       ereport(log_replication_commands ? LOG : DEBUG1,
-               (errmsg("received replication command: %s", cmd_string)));
+   if (IsA(cmd_node, SQLCmd))
+   {
+       if (MyDatabaseId == InvalidOid)
+           ereport(ERROR,
+                   (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
+
+       MemoryContextSwitchTo(old_context);
+       MemoryContextDelete(cmd_context);
+
+       /* Tell the caller that this wasn't a WalSender command. */
+       return false;
+   }
 
    /*
-    * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was
-    * called outside of transaction the snapshot should be cleared here.
+    * Report query to various monitoring facilities.  For this purpose, we
+    * report replication commands just like SQL commands.
     */
-   if (!IsTransactionBlock())
-       SnapBuildClearExportedSnapshot();
+   debug_query_string = cmd_string;
+
+   pgstat_report_activity(STATE_RUNNING, cmd_string);
 
    /*
-    * For aborted transactions, don't allow anything except pure SQL, the
-    * exec_simple_query() will handle it correctly.
+    * Log replication command if log_replication_commands is enabled. Even
+    * when it's disabled, log the command with DEBUG1 level for backward
+    * compatibility.
     */
-   if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
+   ereport(log_replication_commands ? LOG : DEBUG1,
+           (errmsg("received replication command: %s", cmd_string)));
+
+   /*
+    * Disallow replication commands in aborted transaction blocks.
+    */
+   if (IsAbortedTransactionBlockState())
        ereport(ERROR,
                (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
                 errmsg("current transaction is aborted, "
@@ -1597,9 +1616,6 @@ exec_replication_command(const char *cmd_string)
    initStringInfo(&reply_message);
    initStringInfo(&tmpbuf);
 
-   /* Report to pgstat that this process is running */
-   pgstat_report_activity(STATE_RUNNING, NULL);
-
    switch (cmd_node->type)
    {
        case T_IdentifySystemCmd:
@@ -1651,17 +1667,6 @@ exec_replication_command(const char *cmd_string)
            }
            break;
 
-       case T_SQLCmd:
-           if (MyDatabaseId == InvalidOid)
-               ereport(ERROR,
-                       (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
-
-           /* Report to pgstat that this process is now idle */
-           pgstat_report_activity(STATE_IDLE, NULL);
-
-           /* Tell the caller that this wasn't a WalSender command. */
-           return false;
-
        default:
            elog(ERROR, "unrecognized replication command node tag: %u",
                 cmd_node->type);
@@ -1677,6 +1682,7 @@ exec_replication_command(const char *cmd_string)
 
    /* Report to pgstat that this process is now idle */
    pgstat_report_activity(STATE_IDLE, NULL);
+   debug_query_string = NULL;
 
    return true;
 }