Enhance pg_stat_wal_receiver view to display host and port of sender server.
authorFujii Masao <fujii@postgresql.org>
Fri, 30 Mar 2018 22:51:22 +0000 (07:51 +0900)
committerFujii Masao <fujii@postgresql.org>
Fri, 30 Mar 2018 22:51:22 +0000 (07:51 +0900)
Previously there was no way in the standby side to find out the host and port
of the sender server that the walreceiver was currently connected to when
multiple hosts and ports were specified in primary_conninfo. For that purpose,
this patch adds sender_host and sender_port columns into pg_stat_wal_receiver
view. They report the host and port that the active replication connection
currently uses.

Bump catalog version.

Author: Haribabu Kommi
Reviewed-by: Michael Paquier and me
Discussion: https://postgr.es/m/CAJrrPGcV_aq8=cdqkFhVDJKEnDQ70yRTTdY9RODzMnXNrCz2Ow@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walreceiver.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.h
src/include/replication/walreceiver.h
src/test/regress/expected/rules.out

index 3bc4de57d5ab86da9ff697102bffc02de36c9166..c278076e68d86c7c87d574a932c758acf866f409 100644 (file)
@@ -2031,6 +2031,25 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry><type>text</type></entry>
      <entry>Replication slot name used by this WAL receiver</entry>
     </row>
+    <row>
+     <entry><structfield>sender_host</structfield></entry>
+     <entry><type>text</type></entry>
+     <entry>
+      Host of the <productname>PostgreSQL</productname> instance
+      this WAL receiver is connected to. This can be a host name,
+      an IP address, or a directory path if the connection is via
+      Unix socket.  (The path case can be distinguished because it
+      will always be an absolute path, beginning with <literal>/</literal>.)
+     </entry>
+    </row>
+    <row>
+     <entry><structfield>sender_port</structfield></entry>
+     <entry><type>integer</type></entry>
+     <entry>
+      Port number of the <productname>PostgreSQL</productname> instance
+      this WAL receiver is connected to.
+     </entry>
+    </row>
     <row>
      <entry><structfield>conninfo</structfield></entry>
      <entry><type>text</type></entry>
index 5e6e8a64f637832b06b3468b67d8e3313a1137c2..e9e188682fb417491787ebe3d77164bce0ff33dd 100644 (file)
@@ -752,6 +752,8 @@ CREATE VIEW pg_stat_wal_receiver AS
             s.latest_end_lsn,
             s.latest_end_time,
             s.slot_name,
+            s.sender_host,
+            s.sender_port,
             s.conninfo
     FROM pg_stat_get_wal_receiver() s
     WHERE s.pid IS NOT NULL;
index f9aec0531a389db71976c57c562f1028fc1babd5..ec37377efe5b2501151fff39ae937e69fbbc9f47 100644 (file)
@@ -53,6 +53,8 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
                 char **err);
 static void libpqrcv_check_conninfo(const char *conninfo);
 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
+static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
+                   char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
                         TimeLineID *primary_tli,
                         int *server_version);
@@ -82,6 +84,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
    libpqrcv_connect,
    libpqrcv_check_conninfo,
    libpqrcv_get_conninfo,
+   libpqrcv_get_senderinfo,
    libpqrcv_identify_system,
    libpqrcv_readtimelinehistoryfile,
    libpqrcv_startstreaming,
@@ -282,6 +285,29 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
    return retval;
 }
 
+/*
+ * Provides information of sender this WAL receiver is connected to.
+ */
+static void
+libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
+                     int *sender_port)
+{
+   char *ret = NULL;
+
+   *sender_host = NULL;
+   *sender_port = 0;
+
+   Assert(conn->streamConn != NULL);
+
+   ret = PQhost(conn->streamConn);
+   if (ret && strlen(ret) != 0)
+       *sender_host = pstrdup(ret);
+
+   ret = PQport(conn->streamConn);
+   if (ret && strlen(ret) != 0)
+       *sender_port = atoi(ret);
+}
+
 /*
  * Check that primary's system identifier matches ours, and fetch the current
  * timeline ID of the primary.
index a39a98ff1871c0576fda18170a57c81512dd61b6..b9dab322d6bca19891f78b2be66df865e9b2dc52 100644 (file)
@@ -52,6 +52,7 @@
 #include "access/xlog_internal.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
+#include "common/ip.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
@@ -199,6 +200,8 @@ WalReceiverMain(void)
    TimestampTz now;
    bool        ping_sent;
    char       *err;
+   char       *sender_host = NULL;
+   int         sender_port = 0;
 
    /*
     * WalRcv should be set up already (if we are a backend, we inherit this
@@ -308,19 +311,30 @@ WalReceiverMain(void)
 
    /*
     * Save user-visible connection string.  This clobbers the original
-    * conninfo, for security.
+    * conninfo, for security. Also save host and port of the sender server
+    * this walreceiver is connected to.
     */
    tmp_conninfo = walrcv_get_conninfo(wrconn);
+   walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
    SpinLockAcquire(&walrcv->mutex);
    memset(walrcv->conninfo, 0, MAXCONNINFO);
    if (tmp_conninfo)
        strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
+
+   memset(walrcv->sender_host, 0, NI_MAXHOST);
+   if (sender_host)
+       strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
+
+   walrcv->sender_port = sender_port;
    walrcv->ready_to_display = true;
    SpinLockRelease(&walrcv->mutex);
 
    if (tmp_conninfo)
        pfree(tmp_conninfo);
 
+   if (sender_host)
+       pfree(sender_host);
+
    first_stream = true;
    for (;;)
    {
@@ -1402,6 +1416,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
    TimestampTz last_receipt_time;
    XLogRecPtr  latest_end_lsn;
    TimestampTz latest_end_time;
+   char        sender_host[NI_MAXHOST];
+   int         sender_port = 0;
    char        slotname[NAMEDATALEN];
    char        conninfo[MAXCONNINFO];
 
@@ -1419,6 +1435,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
    latest_end_lsn = WalRcv->latestWalEnd;
    latest_end_time = WalRcv->latestWalEndTime;
    strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
+   strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
+   sender_port = WalRcv->sender_port;
    strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
    SpinLockRelease(&WalRcv->mutex);
 
@@ -1482,10 +1500,18 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
            nulls[10] = true;
        else
            values[10] = CStringGetTextDatum(slotname);
-       if (*conninfo == '\0')
+       if (*sender_host == '\0')
            nulls[11] = true;
        else
-           values[11] = CStringGetTextDatum(conninfo);
+           values[11] = CStringGetTextDatum(sender_host);
+       if (sender_port == 0)
+           nulls[12] = true;
+       else
+           values[12] = Int32GetDatum(sender_port);
+       if (*conninfo == '\0')
+           nulls[13] = true;
+       else
+           values[13] = CStringGetTextDatum(conninfo);
    }
 
    /* Returns the record as Datum */
index d4a7b23f80121a93abacc5792f586d628fb06121..b2806e6595a4e3eb98698e35bdd6b61bf1f4a644 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 201803291
+#define CATALOG_VERSION_NO 201803311
 
 #endif
index 3792229ffadcc2de3aaa8e0b99ec1d66277a1eae..90d994c71a4224fe01a4a0426494c384156e414e 100644 (file)
@@ -2919,7 +2919,7 @@ DATA(insert OID = 3318 (  pg_stat_get_progress_info             PGNSP PGUID 12 1 100 0 0
 DESCR("statistics: information about progress of backends running maintenance command");
 DATA(insert OID = 3099 (  pg_stat_get_wal_senders  PGNSP PGUID 12 1 10 0 0 f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
-DATA(insert OID = 3317 (  pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
+DATA(insert OID = 3317 (  pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25,23,25}" "{o,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
 DATA(insert OID = 6118 (  pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ ));
 DESCR("statistics: information about subscription");
index ea7967f6fc54015c2387c475cade284e3e6eb214..14d401674d602cdb777249dd574e5aff4c94a8ab 100644 (file)
@@ -108,6 +108,13 @@ typedef struct
     */
    char        conninfo[MAXCONNINFO];
 
+   /*
+    * Host name (this can be a host name, an IP address, or a directory
+    * path) and port number of the active replication connection.
+    */
+   char        sender_host[NI_MAXHOST];
+   int         sender_port;
+
    /*
     * replication slot name; is also used for walreceiver to connect with the
     * primary
@@ -197,6 +204,9 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logica
                                               char **err);
 typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
 typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
+typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
+                                        char **sender_host,
+                                        int *sender_port);
 typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
                                            TimeLineID *primary_tli,
                                            int *server_version);
@@ -227,6 +237,7 @@ typedef struct WalReceiverFunctionsType
    walrcv_connect_fn walrcv_connect;
    walrcv_check_conninfo_fn walrcv_check_conninfo;
    walrcv_get_conninfo_fn walrcv_get_conninfo;
+   walrcv_get_senderinfo_fn walrcv_get_senderinfo;
    walrcv_identify_system_fn walrcv_identify_system;
    walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
    walrcv_startstreaming_fn walrcv_startstreaming;
@@ -246,6 +257,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
    WalReceiverFunctions->walrcv_check_conninfo(conninfo)
 #define walrcv_get_conninfo(conn) \
    WalReceiverFunctions->walrcv_get_conninfo(conn)
+#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
+   WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli, server_version) \
    WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
index 5149b72fe9110cb5f823ad888c7e8ac2895be505..ae0cd253d5f30abc35808e653e3644ef966673d5 100644 (file)
@@ -1972,8 +1972,10 @@ pg_stat_wal_receiver| SELECT s.pid,
     s.latest_end_lsn,
     s.latest_end_time,
     s.slot_name,
+    s.sender_host,
+    s.sender_port,
     s.conninfo
-   FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, conninfo)
+   FROM pg_stat_get_wal_receiver() s(pid, status, receive_start_lsn, receive_start_tli, received_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, sender_host, sender_port, conninfo)
   WHERE (s.pid IS NOT NULL);
 pg_stat_xact_all_tables| SELECT c.oid AS relid,
     n.nspname AS schemaname,