Use Enum for top level logical replication message types.
authorAmit Kapila <akapila@postgresql.org>
Mon, 2 Nov 2020 02:48:18 +0000 (08:18 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 2 Nov 2020 02:48:18 +0000 (08:18 +0530)
Logical replication protocol uses a single byte character to identify a
message type in logical replication protocol. The code uses string
literals for the same. Use Enum so that

1. All the string literals used can be found at a single place. This
makes it easy to add more types without the risk of conflicts.

2. It's easy to locate the code handling a given message type.

3. When used with switch statements, it is easy to identify the missing
cases using -Wswitch.

Author: Ashutosh Bapat
Reviewed-by: Kyotaro Horiguchi, Andres Freund, Peter Smith and Amit Kapila
Discussion: https://postgr.es/m/CAExHW5uPzQ7L0oAd_ENyvaiYMOPgkrAoJpE+ZY5-obdcVT6NPg@mail.gmail.com

src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/include/replication/logicalproto.h

index eb19142b48659567d91a984fc09d912d20974865..fdb31182d77f045e70e849807692c8209cb049ce 100644 (file)
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
 void
 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
 {
-       pq_sendbyte(out, 'B');          /* BEGIN */
+       pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
 
        /* fixed fields */
        pq_sendint64(out, txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 {
        uint8           flags = 0;
 
-       pq_sendbyte(out, 'C');          /* sending COMMIT */
+       pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
 
        /* send the flags field (unused for now) */
        pq_sendbyte(out, flags);
@@ -112,7 +112,7 @@ void
 logicalrep_write_origin(StringInfo out, const char *origin,
                                                XLogRecPtr origin_lsn)
 {
-       pq_sendbyte(out, 'O');          /* ORIGIN */
+       pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
 
        /* fixed fields */
        pq_sendint64(out, origin_lsn);
@@ -141,7 +141,7 @@ void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
                                                HeapTuple newtuple, bool binary)
 {
-       pq_sendbyte(out, 'I');          /* action INSERT */
+       pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
        /* transaction ID (if not valid, we're not streaming) */
        if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
                                                HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 {
-       pq_sendbyte(out, 'U');          /* action UPDATE */
+       pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
        Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
                   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
                   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
                   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
-       pq_sendbyte(out, 'D');          /* action DELETE */
+       pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
        /* transaction ID (if not valid, we're not streaming) */
        if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
        int                     i;
        uint8           flags = 0;
 
-       pq_sendbyte(out, 'T');          /* action TRUNCATE */
+       pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
 
        /* transaction ID (if not valid, we're not streaming) */
        if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 {
        char       *relname;
 
-       pq_sendbyte(out, 'R');          /* sending RELATION */
+       pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
 
        /* transaction ID (if not valid, we're not streaming) */
        if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
        HeapTuple       tup;
        Form_pg_type typtup;
 
-       pq_sendbyte(out, 'Y');          /* sending TYPE */
+       pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
 
        /* transaction ID (if not valid, we're not streaming) */
        if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
 logicalrep_write_stream_start(StringInfo out,
                                                          TransactionId xid, bool first_segment)
 {
-       pq_sendbyte(out, 'S');          /* action STREAM START */
+       pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
 
        Assert(TransactionIdIsValid(xid));
 
@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
 void
 logicalrep_write_stream_stop(StringInfo out)
 {
-       pq_sendbyte(out, 'E');          /* action STREAM END */
+       pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
 }
 
 /*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 {
        uint8           flags = 0;
 
-       pq_sendbyte(out, 'c');          /* action STREAM COMMIT */
+       pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
 
        Assert(TransactionIdIsValid(txn->xid));
 
@@ -849,7 +849,7 @@ void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
                                                          TransactionId subxid)
 {
-       pq_sendbyte(out, 'A');          /* action STREAM ABORT */
+       pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
 
        Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
 
index b0f27e0af8568ae81336859aed9bdc535a38083d..04684912dea3a6a8878dddee0d5c0e5b5d5d70dd 100644 (file)
@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s)
 static void
 apply_dispatch(StringInfo s)
 {
-       char            action = pq_getmsgbyte(s);
+       LogicalRepMsgType action = pq_getmsgbyte(s);
 
        switch (action)
        {
-                       /* BEGIN */
-               case 'B':
+               case LOGICAL_REP_MSG_BEGIN:
                        apply_handle_begin(s);
-                       break;
-                       /* COMMIT */
-               case 'C':
+                       return;
+
+               case LOGICAL_REP_MSG_COMMIT:
                        apply_handle_commit(s);
-                       break;
-                       /* INSERT */
-               case 'I':
+                       return;
+
+               case LOGICAL_REP_MSG_INSERT:
                        apply_handle_insert(s);
-                       break;
-                       /* UPDATE */
-               case 'U':
+                       return;
+
+               case LOGICAL_REP_MSG_UPDATE:
                        apply_handle_update(s);
-                       break;
-                       /* DELETE */
-               case 'D':
+                       return;
+
+               case LOGICAL_REP_MSG_DELETE:
                        apply_handle_delete(s);
-                       break;
-                       /* TRUNCATE */
-               case 'T':
+                       return;
+
+               case LOGICAL_REP_MSG_TRUNCATE:
                        apply_handle_truncate(s);
-                       break;
-                       /* RELATION */
-               case 'R':
+                       return;
+
+               case LOGICAL_REP_MSG_RELATION:
                        apply_handle_relation(s);
-                       break;
-                       /* TYPE */
-               case 'Y':
+                       return;
+
+               case LOGICAL_REP_MSG_TYPE:
                        apply_handle_type(s);
-                       break;
-                       /* ORIGIN */
-               case 'O':
+                       return;
+
+               case LOGICAL_REP_MSG_ORIGIN:
                        apply_handle_origin(s);
-                       break;
-                       /* STREAM START */
-               case 'S':
+                       return;
+
+               case LOGICAL_REP_MSG_STREAM_START:
                        apply_handle_stream_start(s);
-                       break;
-                       /* STREAM END */
-               case 'E':
+                       return;
+
+               case LOGICAL_REP_MSG_STREAM_END:
                        apply_handle_stream_stop(s);
-                       break;
-                       /* STREAM ABORT */
-               case 'A':
+                       return;
+
+               case LOGICAL_REP_MSG_STREAM_ABORT:
                        apply_handle_stream_abort(s);
-                       break;
-                       /* STREAM COMMIT */
-               case 'c':
+                       return;
+
+               case LOGICAL_REP_MSG_STREAM_COMMIT:
                        apply_handle_stream_commit(s);
-                       break;
-               default:
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid logical replication message type \"%c\"", action)));
+                       return;
        }
+
+       ereport(ERROR,
+                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                        errmsg("invalid logical replication message type \"%c\"", action)));
 }
 
 /*
index 0c2cda264e14f81ec3745c2cac6eb25f6133ea69..cca13dae964c36481850cc24262fda2f0e21bc88 100644 (file)
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
+/*
+ * Logical message types
+ *
+ * Used by logical replication wire protocol.
+ *
+ * Note: though this is an enum, the values are used to identify message types
+ * in logical replication protocol, which uses a single byte to identify a
+ * message type. Hence the values should be single byte wide and preferrably
+ * human readable characters.
+ */
+typedef enum LogicalRepMsgType
+{
+       LOGICAL_REP_MSG_BEGIN = 'B',
+       LOGICAL_REP_MSG_COMMIT = 'C',
+       LOGICAL_REP_MSG_ORIGIN = 'O',
+       LOGICAL_REP_MSG_INSERT = 'I',
+       LOGICAL_REP_MSG_UPDATE = 'U',
+       LOGICAL_REP_MSG_DELETE = 'D',
+       LOGICAL_REP_MSG_TRUNCATE = 'T',
+       LOGICAL_REP_MSG_RELATION = 'R',
+       LOGICAL_REP_MSG_TYPE = 'Y',
+       LOGICAL_REP_MSG_STREAM_START = 'S',
+       LOGICAL_REP_MSG_STREAM_END = 'E',
+       LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+       LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+} LogicalRepMsgType;
+
 /*
  * This struct stores a tuple received via logical replication.
  * Keep in mind that the columns correspond to the *remote* table.