Refactor the syslogger pipe protocol to use a bitmask for its options
authorMichael Paquier <michael@paquier.xyz>
Mon, 13 Sep 2021 00:03:45 +0000 (09:03 +0900)
committerMichael Paquier <michael@paquier.xyz>
Mon, 13 Sep 2021 00:03:45 +0000 (09:03 +0900)
The previous protocol expected a set of matching characters to check if
a message sent was the last one or not, that changed depending on the
destination wanted:
- 't' and 'f' tracked the last message of a log sent to stderr.
- 'T' and 'F' tracked the last message of a log sent to csvlog.

This could be extended with more characters when introducing new
destinations, but using a bitmask is much more elegant.  This commit
changes the protocol so as a bitmask is used in the header of a log
chunk message sent to the syslogger, with the following options
available for now:
- log_destination as stderr.
- log_destination as csvlog.
- if a message is the last chunk of a message.

Sehrope found this issue in a patch set to introduce JSON as an option
for log_destination, but his patch made the size of the protocol header
larger.  This commit keeps the same size as the original, and adapts the
protocol as wanted.

Thanks also to Andrew Dunstan and Greg Stark for the discussion.

Author: Michael Paquier, Sehrope Sarkuni
Discussion: https://postgr.es/m/CAH7T-aqswBM6JWe4pDehi1uOiufqe06DJWaU5=X7dDLyqUExHg@mail.gmail.com

src/backend/postmaster/syslogger.c
src/backend/utils/error/elog.c
src/include/postmaster/syslogger.h

index cad43bdef23fccd13b6250deafb619cb1092b386..bca38835726a508c3fd8b967dacd106e54441267 100644 (file)
@@ -38,6 +38,7 @@
 #include "nodes/pg_list.h"
 #include "pgstat.h"
 #include "pgtime.h"
+#include "port/pg_bitutils.h"
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
@@ -885,14 +886,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
    {
        PipeProtoHeader p;
        int         chunklen;
+       bits8       dest_flags;
 
        /* Do we have a valid header? */
        memcpy(&p, cursor, offsetof(PipeProtoHeader, data));
+       dest_flags = p.flags & (PIPE_PROTO_DEST_STDERR | PIPE_PROTO_DEST_CSVLOG);
        if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
            p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
            p.pid != 0 &&
-           (p.is_last == 't' || p.is_last == 'f' ||
-            p.is_last == 'T' || p.is_last == 'F'))
+           pg_popcount((char *) &dest_flags, 1) == 1)
        {
            List       *buffer_list;
            ListCell   *cell;
@@ -906,8 +908,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
            if (count < chunklen)
                break;
 
-           dest = (p.is_last == 'T' || p.is_last == 'F') ?
-               LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
+           if ((p.flags & PIPE_PROTO_DEST_STDERR) != 0)
+               dest = LOG_DESTINATION_STDERR;
+           else if ((p.flags & PIPE_PROTO_DEST_CSVLOG) != 0)
+               dest = LOG_DESTINATION_CSVLOG;
+           else
+           {
+               /* this should never happen as of the header validation */
+               Assert(false);
+           }
 
            /* Locate any existing buffer for this source pid */
            buffer_list = buffer_lists[p.pid % NBUFFER_LISTS];
@@ -924,7 +933,7 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
                    free_slot = buf;
            }
 
-           if (p.is_last == 'f' || p.is_last == 'F')
+           if ((p.flags & PIPE_PROTO_IS_LAST) == 0)
            {
                /*
                 * Save a complete non-final chunk in a per-pid buffer
index 816b071afaa893d4ef81d8546fa47fbc4c6ff162..2af87ee3bd1f4ad99f51f4706218f4da2f9acb62 100644 (file)
@@ -3250,11 +3250,16 @@ write_pipe_chunks(char *data, int len, int dest)
 
    p.proto.nuls[0] = p.proto.nuls[1] = '\0';
    p.proto.pid = MyProcPid;
+   p.proto.flags = 0;
+   if (dest == LOG_DESTINATION_STDERR)
+       p.proto.flags |= PIPE_PROTO_DEST_STDERR;
+   else if (dest == LOG_DESTINATION_CSVLOG)
+       p.proto.flags |= PIPE_PROTO_DEST_CSVLOG;
 
    /* write all but the last chunk */
    while (len > PIPE_MAX_PAYLOAD)
    {
-       p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'F' : 'f');
+       /* no need to set PIPE_PROTO_IS_LAST yet */
        p.proto.len = PIPE_MAX_PAYLOAD;
        memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
        rc = write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
@@ -3264,7 +3269,7 @@ write_pipe_chunks(char *data, int len, int dest)
    }
 
    /* write the last chunk */
-   p.proto.is_last = (dest == LOG_DESTINATION_CSVLOG ? 'T' : 't');
+   p.proto.flags |= PIPE_PROTO_IS_LAST;
    p.proto.len = len;
    memcpy(p.proto.data, data, len);
    rc = write(fd, &p, PIPE_HEADER_SIZE + len);
index 1491eecb0f8f206051c3f43e20ef46af70607fd1..c79dfbeba2a6d9f7cf485b4cf4dd22e1104942f4 100644 (file)
@@ -46,8 +46,7 @@ typedef struct
    char        nuls[2];        /* always \0\0 */
    uint16      len;            /* size of this chunk (counts data only) */
    int32       pid;            /* writer's pid */
-   char        is_last;        /* last chunk of message? 't' or 'f' ('T' or
-                                * 'F' for CSV case) */
+   bits8       flags;          /* bitmask of PIPE_PROTO_* */
    char        data[FLEXIBLE_ARRAY_MEMBER];    /* data payload starts here */
 } PipeProtoHeader;
 
@@ -60,6 +59,11 @@ typedef union
 #define PIPE_HEADER_SIZE  offsetof(PipeProtoHeader, data)
 #define PIPE_MAX_PAYLOAD  ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
 
+/* flag bits for PipeProtoHeader->flags */
+#define PIPE_PROTO_IS_LAST 0x01    /* last chunk of message? */
+/* log destinations */
+#define PIPE_PROTO_DEST_STDERR 0x10
+#define PIPE_PROTO_DEST_CSVLOG 0x20
 
 /* GUC options */
 extern bool Logging_collector;