Refactor sending of RowDescription messages in replication protocol
authorPeter Eisentraut <peter@eisentraut.org>
Mon, 4 Jul 2022 05:25:26 +0000 (07:25 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Mon, 4 Jul 2022 17:43:58 +0000 (19:43 +0200)
Some routines open-coded the construction of RowDescription messages.
Instead, we have support for doing this using tuple descriptors and
DestRemoteSimple, so use that instead.

Reviewed-by: Nathan Bossart <nathandbossart@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com

src/backend/access/common/tupdesc.c
src/backend/replication/basebackup_copy.c
src/backend/replication/walsender.c

index 9f41b1e8543f86c7bdc52dea8a4d6af9d4c9d6d9..d6fb261e20162cc0f9fc920cbcf3daea4a46d4a6 100644 (file)
@@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
                        att->attcollation = InvalidOid;
                        break;
 
+               case OIDOID:
+                       att->attlen = 4;
+                       att->attbyval = true;
+                       att->attalign = TYPALIGN_INT;
+                       att->attstorage = TYPSTORAGE_PLAIN;
+                       att->attcompression = InvalidCompressionMethod;
+                       att->attcollation = InvalidOid;
+                       break;
+
                default:
                        elog(ERROR, "unsupported type %u", oidtypeid);
        }
index 1eed9d8c3f79454357c6a14a7b27e655d4659259..df0471a7a461d2b890b14decf55452f9091107c7 100644 (file)
  */
 #include "postgres.h"
 
+#include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
+#include "tcop/dest.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -336,35 +338,24 @@ SendCopyDone(void)
 static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        char            str[MAXFNAMELEN];
        Size            len;
 
-       pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint16(&buf, 2);          /* 2 fields */
-
-       /* Field headers */
-       pq_sendstring(&buf, "recptr");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-
-       pq_sendstring(&buf, "tli");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
+       dest = CreateDestReceiver(DestRemoteSimple);
 
+       tupdesc = CreateTemplateTupleDesc(2);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
        /*
         * int8 may seem like a surprising data type for this, but in theory int4
         * would not be wide enough for this, as TimeLineID is unsigned.
         */
-       pq_sendint32(&buf, INT8OID);    /* type oid */
-       pq_sendint16(&buf, 8);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_endmessage(&buf);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
+
+       /* send RowDescription */
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
 
        /* Data row */
        pq_beginmessage(&buf, 'D');
@@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 static void
 SendTablespaceList(List *tablespaces)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        ListCell   *lc;
 
-       /* Construct and send the directory information */
-       pq_beginmessage(&buf, 'T'); /* RowDescription */
-       pq_sendint16(&buf, 3);          /* 3 fields */
-
-       /* First field - spcoid */
-       pq_sendstring(&buf, "spcoid");
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, OIDOID); /* type oid */
-       pq_sendint16(&buf, 4);          /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-
-       /* Second field - spclocation */
-       pq_sendstring(&buf, "spclocation");
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_sendint32(&buf, TEXTOID);
-       pq_sendint16(&buf, -1);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-
-       /* Third field - size */
-       pq_sendstring(&buf, "size");
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_sendint32(&buf, INT8OID);
-       pq_sendint16(&buf, 8);
-       pq_sendint32(&buf, 0);
-       pq_sendint16(&buf, 0);
-       pq_endmessage(&buf);
+       dest = CreateDestReceiver(DestRemoteSimple);
+
+       tupdesc = CreateTemplateTupleDesc(3);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
 
+       /* send RowDescription */
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
+
+       /* Construct and send the directory information */
        foreach(lc, tablespaces)
        {
                tablespaceinfo *ti = lfirst(lc);
index cb4a858687361364fb8d3d6c998079116027b3a8..3c407ab964752f453cf81c929ebb082732ca6563 100644 (file)
@@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
 static void
 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
 {
+       DestReceiver *dest;
+       TupleDesc       tupdesc;
        StringInfoData buf;
        char            histfname[MAXFNAMELEN];
        char            path[MAXPGPATH];
@@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
        off_t           bytesleft;
        Size            len;
 
+       dest = CreateDestReceiver(DestRemoteSimple);
+
        /*
         * Reply with a result set with one row, and two columns. The first col is
         * the name of the history file, 2nd is the contents.
         */
+       tupdesc = CreateTemplateTupleDesc(2);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
+       TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
 
        TLHistoryFileName(histfname, cmd->timeline);
        TLHistoryFilePath(path, cmd->timeline);
 
        /* Send a RowDescription message */
-       pq_beginmessage(&buf, 'T');
-       pq_sendint16(&buf, 2);          /* 2 fields */
-
-       /* first field */
-       pq_sendstring(&buf, "filename");        /* col name */
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);         /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-
-       /* second field */
-       pq_sendstring(&buf, "content"); /* col name */
-       pq_sendint32(&buf, 0);          /* table oid */
-       pq_sendint16(&buf, 0);          /* attnum */
-       pq_sendint32(&buf, TEXTOID);    /* type oid */
-       pq_sendint16(&buf, -1);         /* typlen */
-       pq_sendint32(&buf, 0);          /* typmod */
-       pq_sendint16(&buf, 0);          /* format code */
-       pq_endmessage(&buf);
+       dest->rStartup(dest, CMD_SELECT, tupdesc);
 
        /* Send a DataRow message */
        pq_beginmessage(&buf, 'D');