diff options
| author | Bruce Momjian | 2010-02-26 02:01:40 +0000 |
|---|---|---|
| committer | Bruce Momjian | 2010-02-26 02:01:40 +0000 |
| commit | 65e806cba1f0f154d51caa7478e7192ce58d1056 (patch) | |
| tree | 99a656d7b4ec6d038d4c24e07fadf75db4c37e79 /src/backend/replication | |
| parent | 16040575a04486d8e0823b4e304f4933144baf90 (diff) | |
pgindent run for 9.0
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 58 | ||||
| -rw-r--r-- | src/backend/replication/walreceiver.c | 105 | ||||
| -rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 34 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 287 |
4 files changed, 244 insertions, 240 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 875dbafa110..49cf7b597f9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.4 2010/02/25 07:31:40 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.5 2010/02/26 02:00:58 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -49,7 +49,7 @@ static char *recvBuf = NULL; /* Prototypes for interface functions */ static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, - char **buffer, int *len); + char **buffer, int *len); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ @@ -94,22 +94,23 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) PQerrorMessage(streamConn)))); /* - * Get the system identifier and timeline ID as a DataRow message - * from the primary server. + * Get the system identifier and timeline ID as a DataRow message from the + * primary server. */ res = PQexec(streamConn, "IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) - { + { PQclear(res); ereport(ERROR, (errmsg("could not receive the SYSID and timeline ID from " "the primary server: %s", PQerrorMessage(streamConn)))); - } + } if (PQnfields(res) != 2 || PQntuples(res) != 1) { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + PQclear(res); ereport(ERROR, (errmsg("invalid response from primary server"), @@ -120,8 +121,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); /* - * Confirm that the system identifier of the primary is the same - * as ours. + * Confirm that the system identifier of the primary is the same as ours. */ snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, GetSystemIdentifier()); @@ -135,8 +135,8 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) } /* - * Confirm that the current timeline of the primary is the same - * as the recovery target timeline. + * Confirm that the current timeline of the primary is the same as the + * recovery target timeline. */ standby_tli = GetRecoveryTargetTLI(); PQclear(res); @@ -172,7 +172,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) static bool libpq_select(int timeout_ms) { - int ret; + int ret; Assert(streamConn != NULL); if (PQsocket(streamConn) < 0) @@ -197,15 +197,15 @@ libpq_select(int timeout_ms) struct timeval *ptr_timeout; FD_ZERO(&input_mask); - FD_SET(PQsocket(streamConn), &input_mask); + FD_SET (PQsocket(streamConn), &input_mask); if (timeout_ms < 0) ptr_timeout = NULL; else { - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms % 1000) * 1000; - ptr_timeout = &timeout; + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; } ret = select(PQsocket(streamConn) + 1, &input_mask, @@ -239,12 +239,12 @@ libpqrcv_disconnect(void) * * Returns: * - * True if data was received. *type, *buffer and *len are set to - * the type of the received data, buffer holding it, and length, - * respectively. + * True if data was received. *type, *buffer and *len are set to + * the type of the received data, buffer holding it, and length, + * respectively. * - * False if no data was available within timeout, or wait was interrupted - * by signal. + * False if no data was available within timeout, or wait was interrupted + * by signal. * * The buffer returned is only valid until the next call of this function or * libpq_connect/disconnect. @@ -261,10 +261,10 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) recvBuf = NULL; /* - * If the caller requested to block, wait for data to arrive. But if - * this is the first call after connecting, don't wait, because - * there might already be some data in libpq buffer that we haven't - * returned to caller. + * If the caller requested to block, wait for data to arrive. But if this + * is the first call after connecting, don't wait, because there might + * already be some data in libpq buffer that we haven't returned to + * caller. */ if (timeout > 0 && !justconnected) { @@ -280,11 +280,11 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) /* Receive CopyData message */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); - if (rawlen == 0) /* no data available yet, then return */ + if (rawlen == 0) /* no data available yet, then return */ return false; - if (rawlen == -1) /* end-of-streaming or error */ + if (rawlen == -1) /* end-of-streaming or error */ { - PGresult *res; + PGresult *res; res = PQgetResult(streamConn); if (PQresultStatus(res) == PGRES_COMMAND_OK) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3f82693dcea..9f86b0645d0 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.5 2010/02/19 10:51:04 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.6 2010/02/26 02:00:57 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -51,7 +51,7 @@ #include "utils/resowner.h" /* Global variable to indicate if this process is a walreceiver process */ -bool am_walreceiver; +bool am_walreceiver; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; @@ -102,9 +102,9 @@ static void ProcessWalRcvInterrupts(void) { /* - * Although walreceiver interrupt handling doesn't use the same scheme - * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we - * receive any incoming signals on Win32. + * Although walreceiver interrupt handling doesn't use the same scheme as + * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive + * any incoming signals on Win32. */ CHECK_FOR_INTERRUPTS(); @@ -148,37 +148,38 @@ static void XLogWalRcvFlush(void); */ static struct { - XLogRecPtr Write; /* last byte + 1 written out in the standby */ - XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ -} LogstreamResult; + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; /* Main entry point for walreceiver process */ void WalReceiverMain(void) { - char conninfo[MAXCONNINFO]; - XLogRecPtr startpoint; + char conninfo[MAXCONNINFO]; + XLogRecPtr startpoint; + /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; am_walreceiver = true; /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). + * WalRcv should be set up already (if we are a backend, we inherit this + * by fork() or EXEC_BACKEND mechanism from the postmaster). */ Assert(walrcv != NULL); /* * Mark walreceiver as running in shared memory. * - * Do this as early as possible, so that if we fail later on, we'll - * set state to STOPPED. If we die before this, the startup process - * will keep waiting for us to start up, until it times out. + * Do this as early as possible, so that if we fail later on, we'll set + * state to STOPPED. If we die before this, the startup process will keep + * waiting for us to start up, until it times out. */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->pid == 0); - switch(walrcv->walRcvState) + switch (walrcv->walRcvState) { case WALRCV_STOPPING: /* If we've already been requested to stop, don't start up. */ @@ -222,7 +223,8 @@ WalReceiverMain(void) #endif /* Properly accept or ignore signals the postmaster might send us */ - pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ + pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config + * file */ pqsignal(SIGINT, SIG_IGN); pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ @@ -264,9 +266,9 @@ WalReceiverMain(void) /* Loop until end-of-streaming or error */ for (;;) { - unsigned char type; - char *buf; - int len; + unsigned char type; + char *buf; + int len; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -299,12 +301,12 @@ WalReceiverMain(void) XLogWalRcvProcessMsg(type, buf, len); /* Receive any more data we can without sleeping */ - while(walrcv_receive(0, &type, &buf, &len)) + while (walrcv_receive(0, &type, &buf, &len)) XLogWalRcvProcessMsg(type, buf, len); /* - * If we've written some records, flush them to disk and - * let the startup process know about them. + * If we've written some records, flush them to disk and let the + * startup process know about them. */ XLogWalRcvFlush(); } @@ -375,8 +377,8 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random * backend. This is necessary precisely because we don't clean up our * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm - * in being doubly sure.) + * should ensure the postmaster sees this as a crash, too, but no harm in + * being doubly sure.) */ exit(2); } @@ -389,20 +391,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) { switch (type) { - case 'w': /* WAL records */ - { - XLogRecPtr recptr; + case 'w': /* WAL records */ + { + XLogRecPtr recptr; - if (len < sizeof(XLogRecPtr)) - ereport(ERROR, - (errmsg("invalid WAL message received from primary"))); + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); - recptr = *((XLogRecPtr *) buf); - buf += sizeof(XLogRecPtr); - len -= sizeof(XLogRecPtr); - XLogWalRcvWrite(buf, len, recptr); - break; - } + recptr = *((XLogRecPtr *) buf); + buf += sizeof(XLogRecPtr); + len -= sizeof(XLogRecPtr); + XLogWalRcvWrite(buf, len, recptr); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -417,20 +419,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { - int startoff; - int byteswritten; + int startoff; + int byteswritten; while (nbytes > 0) { - int segbytes; + int segbytes; if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) { - bool use_existent; + bool use_existent; /* - * fsync() and close current file before we switch to next one. - * We would otherwise have to reopen this file to fsync it later + * fsync() and close current file before we switch to next one. We + * would otherwise have to reopen this file to fsync it later */ if (recvFile >= 0) { @@ -444,8 +446,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (close(recvFile) != 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not close log file %u, segment %u: %m", - recvId, recvSeg))); + errmsg("could not close log file %u, segment %u: %m", + recvId, recvSeg))); } recvFile = -1; @@ -500,14 +502,13 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) nbytes -= byteswritten; buf += byteswritten; - LogstreamResult.Write = recptr; + LogstreamResult.Write = recptr; /* - * XXX: Should we signal bgwriter to start a restartpoint - * if we've consumed too much xlog since the last one, like - * in normal processing? But this is not worth doing unless - * a restartpoint can be created independently from a - * checkpoint record. + * XXX: Should we signal bgwriter to start a restartpoint if we've + * consumed too much xlog since the last one, like in normal + * processing? But this is not worth doing unless a restartpoint can + * be created independently from a checkpoint record. */ } } @@ -520,7 +521,7 @@ XLogWalRcvFlush(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - char activitymsg[50]; + char activitymsg[50]; issue_xlog_fsync(recvFile, recvId, recvSeg); diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 4fb132dcd4e..be305790fd3 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.4 2010/02/26 02:00:57 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -42,7 +42,7 @@ WalRcvData *WalRcv = NULL; Size WalRcvShmemSize(void) { - Size size = 0; + Size size = 0; size = add_size(size, sizeof(WalRcvData)); @@ -53,7 +53,7 @@ WalRcvShmemSize(void) void WalRcvShmemInit(void) { - bool found; + bool found; WalRcv = (WalRcvData *) ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); @@ -78,7 +78,7 @@ WalRcvInProgress(void) /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; WalRcvState state; - pg_time_t startTime; + pg_time_t startTime; SpinLockAcquire(&walrcv->mutex); @@ -88,14 +88,14 @@ WalRcvInProgress(void) SpinLockRelease(&walrcv->mutex); /* - * If it has taken too long for walreceiver to start up, give up. - * Setting the state to STOPPED ensures that if walreceiver later - * does start up after all, it will see that it's not supposed to be - * running and die without doing anything. + * If it has taken too long for walreceiver to start up, give up. Setting + * the state to STOPPED ensures that if walreceiver later does start up + * after all, it will see that it's not supposed to be running and die + * without doing anything. */ if (state == WALRCV_STARTING) { - pg_time_t now = (pg_time_t) time(NULL); + pg_time_t now = (pg_time_t) time(NULL); if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { @@ -122,7 +122,7 @@ ShutdownWalRcv(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pid_t walrcvpid = 0; + pid_t walrcvpid = 0; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -130,7 +130,7 @@ ShutdownWalRcv(void) * restart itself. */ SpinLockAcquire(&walrcv->mutex); - switch(walrcv->walRcvState) + switch (walrcv->walRcvState) { case WALRCV_STOPPED: break; @@ -180,14 +180,13 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - pg_time_t now = (pg_time_t) time(NULL); + pg_time_t now = (pg_time_t) time(NULL); /* - * We always start at the beginning of the segment. - * That prevents a broken segment (i.e., with no records in the - * first half of a segment) from being created by XLOG streaming, - * which might cause trouble later on if the segment is e.g - * archived. + * We always start at the beginning of the segment. That prevents a broken + * segment (i.e., with no records in the first half of a segment) from + * being created by XLOG streaming, which might cause trouble later on if + * the segment is e.g archived. */ if (recptr.xrecoff % XLogSegSize != 0) recptr.xrecoff -= recptr.xrecoff % XLogSegSize; @@ -225,4 +224,3 @@ GetWalRcvWriteRecPtr(void) return recptr; } - diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a8706ab06f6..2a2765645e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -30,7 +30,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.8 2010/02/25 07:31:40 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.9 2010/02/26 02:00:58 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -61,11 +61,11 @@ WalSndCtlData *WalSndCtl = NULL; static WalSnd *MyWalSnd = NULL; /* Global state */ -bool am_walsender = false; /* Am I a walsender process ? */ +bool am_walsender = false; /* Am I a walsender process ? */ /* User-settable parameters for walsender */ -int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */ -int WalSndDelay = 200; /* max sleep time between some actions */ +int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */ +int WalSndDelay = 200; /* max sleep time between some actions */ #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -96,9 +96,9 @@ static void WalSndQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ static int WalSndLoop(void); -static void InitWalSnd(void); -static void WalSndHandshake(void); -static void WalSndKill(int code, Datum arg); +static void InitWalSnd(void); +static void WalSndHandshake(void); +static void WalSndKill(int code, Datum arg); static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); static bool XLogSend(StringInfo outMsg); static void CheckClosedConnection(void); @@ -155,13 +155,13 @@ static void WalSndHandshake(void) { StringInfoData input_message; - bool replication_started = false; + bool replication_started = false; initStringInfo(&input_message); while (!replication_started) { - int firstchar; + int firstchar; /* Wait for a command to arrive */ firstchar = pq_getbyte(); @@ -183,99 +183,99 @@ WalSndHandshake(void) * blocking because we've been able to get message type code. */ if (pq_getmessage(&input_message, 0)) - firstchar = EOF; /* suitable message already logged */ + firstchar = EOF; /* suitable message already logged */ } /* Handle the very limited subset of commands expected in this phase */ switch (firstchar) { - case 'Q': /* Query message */ - { - const char *query_string; - XLogRecPtr recptr; - - query_string = pq_getmsgstring(&input_message); - pq_getmsgend(&input_message); - - if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) - { - StringInfoData buf; - char sysid[32]; - char tli[11]; - - /* - * Reply with a result set with one row, two columns. - * First col is system ID, and second if timeline ID - */ - - snprintf(sysid, sizeof(sysid), UINT64_FORMAT, - GetSystemIdentifier()); - snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - - /* Send a RowDescription message */ - pq_beginmessage(&buf, 'T'); - pq_sendint(&buf, 2, 2); /* 2 fields */ - - /* first field */ - pq_sendstring(&buf, "systemid"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - - /* second field */ - pq_sendstring(&buf, "timeline"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, INT4OID, 4); /* type oid */ - pq_sendint(&buf, 4, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ - pq_endmessage(&buf); - - /* Send a DataRow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* # of columns */ - pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ - pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); - pq_sendint(&buf, strlen(tli), 4); /* col2 len */ - pq_sendbytes(&buf, (char *) tli, strlen(tli)); - pq_endmessage(&buf); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - } - else if (sscanf(query_string, "START_REPLICATION %X/%X", - &recptr.xlogid, &recptr.xrecoff) == 2) + case 'Q': /* Query message */ { - StringInfoData buf; - - /* Send a CopyOutResponse message, and start streaming */ - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, 0); - pq_sendint(&buf, 0, 2); - pq_endmessage(&buf); - - /* - * Initialize position to the received one, then - * the xlog records begin to be shipped from that position - */ - sentPtr = recptr; - - /* break out of the loop */ - replication_started = true; + const char *query_string; + XLogRecPtr recptr; + + query_string = pq_getmsgstring(&input_message); + pq_getmsgend(&input_message); + + if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) + { + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. + * First col is system ID, and second if timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + } + else if (sscanf(query_string, "START_REPLICATION %X/%X", + &recptr.xlogid, &recptr.xrecoff) == 2) + { + StringInfoData buf; + + /* Send a CopyOutResponse message, and start streaming */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* + * Initialize position to the received one, then the + * xlog records begin to be shipped from that position + */ + sentPtr = recptr; + + /* break out of the loop */ + replication_started = true; + } + else + { + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", query_string))); + } + break; } - else - { - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", query_string))); - } - break; - } case 'X': /* standby is closing the connection */ @@ -303,7 +303,7 @@ static void CheckClosedConnection(void) { unsigned char firstchar; - int r; + int r; r = pq_getbyte_if_available(&firstchar); if (r < 0) @@ -323,9 +323,9 @@ CheckClosedConnection(void) /* Handle the very limited subset of commands expected in this phase */ switch (firstchar) { - /* - * 'X' means that the standby is closing down the socket. - */ + /* + * 'X' means that the standby is closing down the socket. + */ case 'X': proc_exit(0); @@ -348,7 +348,7 @@ WalSndLoop(void) /* Loop forever */ for (;;) { - int remain; /* remaining time (ms) */ + int remain; /* remaining time (ms) */ /* * Emergency bailout if postmaster has died. This is to avoid the @@ -416,15 +416,16 @@ WalSndLoop(void) return 1; eof: + /* - * Reset whereToSendOutput to prevent ereport from attempting - * to send any more messages to the standby. + * Reset whereToSendOutput to prevent ereport from attempting to send any + * more messages to the standby. */ if (whereToSendOutput == DestRemote) whereToSendOutput = DestNone; proc_exit(0); - return 1; /* keep the compiler quiet */ + return 1; /* keep the compiler quiet */ } /* Initialize a per-walsender data structure for this walsender process */ @@ -432,7 +433,7 @@ static void InitWalSnd(void) { /* use volatile pointer to prevent code rearrangement */ - int i; + int i; /* * WalSndCtl should be set up already (we inherit this by fork() or @@ -497,13 +498,13 @@ WalSndKill(int code, Datum arg) void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) { - char path[MAXPGPATH]; - uint32 startoff; + char path[MAXPGPATH]; + uint32 startoff; while (nbytes > 0) { - int segbytes; - int readbytes; + int segbytes; + int readbytes; startoff = recptr.xrecoff % XLogSegSize; @@ -518,7 +519,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); if (sendFile < 0) - ereport(FATAL, /* XXX: Why FATAL? */ + ereport(FATAL, /* XXX: Why FATAL? */ (errcode_for_file_access(), errmsg("could not open file \"%s\" (log file %u, segment %u): %m", path, sendId, sendSeg))); @@ -546,9 +547,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) if (readbytes <= 0) ereport(FATAL, (errcode_for_file_access(), - errmsg("could not read from log file %u, segment %u, offset %u, " - "length %lu: %m", - sendId, sendSeg, sendOff, (unsigned long) segbytes))); + errmsg("could not read from log file %u, segment %u, offset %u, " + "length %lu: %m", + sendId, sendSeg, sendOff, (unsigned long) segbytes))); /* Update state for read */ XLByteAdvance(recptr, readbytes); @@ -569,7 +570,8 @@ static bool XLogSend(StringInfo outMsg) { XLogRecPtr SendRqstPtr; - char activitymsg[50]; + char activitymsg[50]; + /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = MyWalSnd; @@ -581,15 +583,15 @@ XLogSend(StringInfo outMsg) return true; /* - * We gather multiple records together by issuing just one XLogRead() - * of a suitable size, and send them as one CopyData message. Repeat - * until we've sent everything we can. + * We gather multiple records together by issuing just one XLogRead() of a + * suitable size, and send them as one CopyData message. Repeat until + * we've sent everything we can. */ while (XLByteLT(sentPtr, SendRqstPtr)) { - XLogRecPtr startptr; - XLogRecPtr endptr; - Size nbytes; + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; /* * Figure out how much to send in one message. If there's less than @@ -600,8 +602,8 @@ XLogSend(StringInfo outMsg) * relies on the fact that we never split a WAL record across two * messages. Since a long WAL record is split at page boundary into * continuation records, page boundary is always a safe cut-off point. - * We also assume that SendRqstPtr never points in the middle of a - * WAL record. + * We also assume that SendRqstPtr never points in the middle of a WAL + * record. */ startptr = sentPtr; if (startptr.xrecoff >= XLogFileSize) @@ -625,10 +627,10 @@ XLogSend(StringInfo outMsg) /* * OK to read and send the slice. * - * We don't need to convert the xlogid/xrecoff from host byte order - * to network byte order because the both server can be expected to - * have the same byte order. If they have different byte order, we - * don't reach here. + * We don't need to convert the xlogid/xrecoff from host byte order to + * network byte order because the both server can be expected to have + * the same byte order. If they have different byte order, we don't + * reach here. */ pq_sendbyte(outMsg, 'w'); pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); @@ -644,8 +646,8 @@ XLogSend(StringInfo outMsg) sentPtr = endptr; /* - * Read the log directly into the output buffer to prevent - * extra memcpy calls. + * Read the log directly into the output buffer to prevent extra + * memcpy calls. */ enlargeStringInfo(outMsg, nbytes); @@ -714,8 +716,8 @@ WalSndQuickDieHandler(SIGNAL_ARGS) * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random * backend. This is necessary precisely because we don't clean up our * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm - * in being doubly sure.) + * should ensure the postmaster sees this as a crash, too, but no harm in + * being doubly sure.) */ exit(2); } @@ -732,14 +734,16 @@ void WalSndSignals(void) { /* Set up signal handlers */ - pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config file */ + pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config + * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */ pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, SIG_IGN); /* not used */ - pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */ + pqsignal(SIGUSR1, SIG_IGN); /* not used */ + pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and + * shutdown */ /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); @@ -753,7 +757,7 @@ WalSndSignals(void) Size WalSndShmemSize(void) { - Size size = 0; + Size size = 0; size = offsetof(WalSndCtlData, walsnds); size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd))); @@ -765,8 +769,8 @@ WalSndShmemSize(void) void WalSndShmemInit(void) { - bool found; - int i; + bool found; + int i; WalSndCtl = (WalSndCtlData *) ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); @@ -783,7 +787,8 @@ WalSndShmemInit(void) for (i = 0; i < MaxWalSenders; i++) { - WalSnd *walsnd = &WalSndCtl->walsnds[i]; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockInit(&walsnd->mutex); } } @@ -795,15 +800,15 @@ WalSndShmemInit(void) XLogRecPtr GetOldestWALSendPointer(void) { - XLogRecPtr oldest = {0, 0}; - int i; - bool found = false; + XLogRecPtr oldest = {0, 0}; + int i; + bool found = false; for (i = 0; i < MaxWalSenders; i++) { /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - XLogRecPtr recptr; + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + XLogRecPtr recptr; if (walsnd->pid == 0) continue; |
