Refactor sending of DataRow messages in replication protocol
authorPeter Eisentraut <peter@eisentraut.org>
Wed, 6 Jul 2022 06:28:02 +0000 (08:28 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Wed, 6 Jul 2022 06:42:56 +0000 (08:42 +0200)
Some routines open-coded the construction of DataRow messages.  Use
TupOutputState struct and associated functions instead, which was
already done in some places.

SendTimeLineHistory() is a bit more complicated and isn't converted by
this.

Reviewed-by: Nathan Bossart <nathandbossart@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com

src/backend/access/common/printsimple.c
src/backend/replication/basebackup_copy.c

index e99aa279f6fbb640445d850b4097e32232dfb731..c99ae54cb026674fd50a8159f8a1ff9dd28101e6 100644 (file)
@@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
                                }
                                break;
 
+                       case OIDOID:
+                               {
+                                       Oid                     num = ObjectIdGetDatum(value);
+                                       char            str[10];        /* 10 digits */
+                                       int                     len;
+
+                                       len = pg_ultoa_n(num, str);
+                                       pq_sendcountedtext(&buf, str, len, false);
+                               }
+                               break;
+
                        default:
                                elog(ERROR, "unsupported type OID: %u", attr->atttypid);
                }
index df0471a7a461d2b890b14decf55452f9091107c7..c384d63a34135ba37fcdc03705f09c4c44f1a530 100644 (file)
 
 #include "access/tupdesc.h"
 #include "catalog/pg_type_d.h"
+#include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
 #include "tcop/dest.h"
+#include "utils/builtins.h"
 #include "utils/timestamp.h"
 
 typedef struct bbsink_copystream
@@ -86,7 +88,6 @@ static void SendCopyOutResponse(void);
 static void SendCopyDone(void);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static void SendTablespaceList(List *tablespaces);
-static void send_int8_string(StringInfoData *buf, int64 intval);
 
 static const bbsink_ops bbsink_copystream_ops = {
        .begin_backup = bbsink_copystream_begin_backup,
@@ -339,10 +340,10 @@ static void
 SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 {
        DestReceiver *dest;
+       TupOutputState *tstate;
        TupleDesc       tupdesc;
-       StringInfoData buf;
-       char            str[MAXFNAMELEN];
-       Size            len;
+       Datum           values[2];
+       bool            nulls[2] = {0};
 
        dest = CreateDestReceiver(DestRemoteSimple);
 
@@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
 
        /* send RowDescription */
-       dest->rStartup(dest, CMD_SELECT, tupdesc);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
        /* Data row */
-       pq_beginmessage(&buf, 'D');
-       pq_sendint16(&buf, 2);          /* number of columns */
-
-       len = snprintf(str, sizeof(str),
-                                  "%X/%X", LSN_FORMAT_ARGS(ptr));
-       pq_sendint32(&buf, len);
-       pq_sendbytes(&buf, str, len);
+       values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
+       values[1] = Int64GetDatum(tli);
+       do_tup_output(tstate, values, nulls);
 
-       len = snprintf(str, sizeof(str), "%u", tli);
-       pq_sendint32(&buf, len);
-       pq_sendbytes(&buf, str, len);
-
-       pq_endmessage(&buf);
+       end_tup_output(tstate);
 
        /* Send a CommandComplete message */
        pq_puttextmessage('C', "SELECT");
@@ -383,8 +376,8 @@ static void
 SendTablespaceList(List *tablespaces)
 {
        DestReceiver *dest;
+       TupOutputState *tstate;
        TupleDesc       tupdesc;
-       StringInfoData buf;
        ListCell   *lc;
 
        dest = CreateDestReceiver(DestRemoteSimple);
@@ -395,51 +388,33 @@ SendTablespaceList(List *tablespaces)
        TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
 
        /* send RowDescription */
-       dest->rStartup(dest, CMD_SELECT, tupdesc);
+       tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
 
        /* Construct and send the directory information */
        foreach(lc, tablespaces)
        {
                tablespaceinfo *ti = lfirst(lc);
+               Datum           values[3];
+               bool            nulls[3] = {0};
 
                /* Send one datarow message */
-               pq_beginmessage(&buf, 'D');
-               pq_sendint16(&buf, 3);  /* number of columns */
                if (ti->path == NULL)
                {
-                       pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
-                       pq_sendint32(&buf, -1);
+                       nulls[0] = true;
+                       nulls[1] = true;
                }
                else
                {
-                       Size            len;
-
-                       len = strlen(ti->oid);
-                       pq_sendint32(&buf, len);
-                       pq_sendbytes(&buf, ti->oid, len);
-
-                       len = strlen(ti->path);
-                       pq_sendint32(&buf, len);
-                       pq_sendbytes(&buf, ti->path, len);
+                       values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
+                       values[1] = CStringGetTextDatum(ti->path);
                }
                if (ti->size >= 0)
-                       send_int8_string(&buf, ti->size / 1024);
+                       values[2] = Int64GetDatum(ti->size / 1024);
                else
-                       pq_sendint32(&buf, -1); /* NULL */
+                       nulls[2] = true;
 
-               pq_endmessage(&buf);
+               do_tup_output(tstate, values, nulls);
        }
-}
-
-/*
- * Send a 64-bit integer as a string via the wire protocol.
- */
-static void
-send_int8_string(StringInfoData *buf, int64 intval)
-{
-       char            is[32];
 
-       sprintf(is, INT64_FORMAT, intval);
-       pq_sendint32(buf, strlen(is));
-       pq_sendbytes(buf, is, strlen(is));
+       end_tup_output(tstate);
 }