summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
authorBruce Momjian2010-02-26 02:01:40 +0000
committerBruce Momjian2010-02-26 02:01:40 +0000
commit65e806cba1f0f154d51caa7478e7192ce58d1056 (patch)
tree99a656d7b4ec6d038d4c24e07fadf75db4c37e79 /src/backend/replication
parent16040575a04486d8e0823b4e304f4933144baf90 (diff)
pgindent run for 9.0
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c58
-rw-r--r--src/backend/replication/walreceiver.c105
-rw-r--r--src/backend/replication/walreceiverfuncs.c34
-rw-r--r--src/backend/replication/walsender.c287
4 files changed, 244 insertions, 240 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 875dbafa110..49cf7b597f9 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.4 2010/02/25 07:31:40 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.5 2010/02/26 02:00:58 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -49,7 +49,7 @@ static char *recvBuf = NULL;
/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
- char **buffer, int *len);
+ char **buffer, int *len);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
@@ -94,22 +94,23 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
PQerrorMessage(streamConn))));
/*
- * Get the system identifier and timeline ID as a DataRow message
- * from the primary server.
+ * Get the system identifier and timeline ID as a DataRow message from the
+ * primary server.
*/
res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
- {
+ {
PQclear(res);
ereport(ERROR,
(errmsg("could not receive the SYSID and timeline ID from "
"the primary server: %s",
PQerrorMessage(streamConn))));
- }
+ }
if (PQnfields(res) != 2 || PQntuples(res) != 1)
{
- int ntuples = PQntuples(res);
- int nfields = PQnfields(res);
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
@@ -120,8 +121,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
- * Confirm that the system identifier of the primary is the same
- * as ours.
+ * Confirm that the system identifier of the primary is the same as ours.
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
@@ -135,8 +135,8 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
}
/*
- * Confirm that the current timeline of the primary is the same
- * as the recovery target timeline.
+ * Confirm that the current timeline of the primary is the same as the
+ * recovery target timeline.
*/
standby_tli = GetRecoveryTargetTLI();
PQclear(res);
@@ -172,7 +172,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
static bool
libpq_select(int timeout_ms)
{
- int ret;
+ int ret;
Assert(streamConn != NULL);
if (PQsocket(streamConn) < 0)
@@ -197,15 +197,15 @@ libpq_select(int timeout_ms)
struct timeval *ptr_timeout;
FD_ZERO(&input_mask);
- FD_SET(PQsocket(streamConn), &input_mask);
+ FD_SET (PQsocket(streamConn), &input_mask);
if (timeout_ms < 0)
ptr_timeout = NULL;
else
{
- timeout.tv_sec = timeout_ms / 1000;
- timeout.tv_usec = (timeout_ms % 1000) * 1000;
- ptr_timeout = &timeout;
+ timeout.tv_sec = timeout_ms / 1000;
+ timeout.tv_usec = (timeout_ms % 1000) * 1000;
+ ptr_timeout = &timeout;
}
ret = select(PQsocket(streamConn) + 1, &input_mask,
@@ -239,12 +239,12 @@ libpqrcv_disconnect(void)
*
* Returns:
*
- * True if data was received. *type, *buffer and *len are set to
- * the type of the received data, buffer holding it, and length,
- * respectively.
+ * True if data was received. *type, *buffer and *len are set to
+ * the type of the received data, buffer holding it, and length,
+ * respectively.
*
- * False if no data was available within timeout, or wait was interrupted
- * by signal.
+ * False if no data was available within timeout, or wait was interrupted
+ * by signal.
*
* The buffer returned is only valid until the next call of this function or
* libpq_connect/disconnect.
@@ -261,10 +261,10 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
recvBuf = NULL;
/*
- * If the caller requested to block, wait for data to arrive. But if
- * this is the first call after connecting, don't wait, because
- * there might already be some data in libpq buffer that we haven't
- * returned to caller.
+ * If the caller requested to block, wait for data to arrive. But if this
+ * is the first call after connecting, don't wait, because there might
+ * already be some data in libpq buffer that we haven't returned to
+ * caller.
*/
if (timeout > 0 && !justconnected)
{
@@ -280,11 +280,11 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
/* Receive CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
- if (rawlen == 0) /* no data available yet, then return */
+ if (rawlen == 0) /* no data available yet, then return */
return false;
- if (rawlen == -1) /* end-of-streaming or error */
+ if (rawlen == -1) /* end-of-streaming or error */
{
- PGresult *res;
+ PGresult *res;
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 3f82693dcea..9f86b0645d0 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.5 2010/02/19 10:51:04 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.6 2010/02/26 02:00:57 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -51,7 +51,7 @@
#include "utils/resowner.h"
/* Global variable to indicate if this process is a walreceiver process */
-bool am_walreceiver;
+bool am_walreceiver;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
@@ -102,9 +102,9 @@ static void
ProcessWalRcvInterrupts(void)
{
/*
- * Although walreceiver interrupt handling doesn't use the same scheme
- * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we
- * receive any incoming signals on Win32.
+ * Although walreceiver interrupt handling doesn't use the same scheme as
+ * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
+ * any incoming signals on Win32.
*/
CHECK_FOR_INTERRUPTS();
@@ -148,37 +148,38 @@ static void XLogWalRcvFlush(void);
*/
static struct
{
- XLogRecPtr Write; /* last byte + 1 written out in the standby */
- XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
-} LogstreamResult;
+ XLogRecPtr Write; /* last byte + 1 written out in the standby */
+ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
+} LogstreamResult;
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
- char conninfo[MAXCONNINFO];
- XLogRecPtr startpoint;
+ char conninfo[MAXCONNINFO];
+ XLogRecPtr startpoint;
+
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
am_walreceiver = true;
/*
- * WalRcv should be set up already (if we are a backend, we inherit
- * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ * WalRcv should be set up already (if we are a backend, we inherit this
+ * by fork() or EXEC_BACKEND mechanism from the postmaster).
*/
Assert(walrcv != NULL);
/*
* Mark walreceiver as running in shared memory.
*
- * Do this as early as possible, so that if we fail later on, we'll
- * set state to STOPPED. If we die before this, the startup process
- * will keep waiting for us to start up, until it times out.
+ * Do this as early as possible, so that if we fail later on, we'll set
+ * state to STOPPED. If we die before this, the startup process will keep
+ * waiting for us to start up, until it times out.
*/
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->pid == 0);
- switch(walrcv->walRcvState)
+ switch (walrcv->walRcvState)
{
case WALRCV_STOPPING:
/* If we've already been requested to stop, don't start up. */
@@ -222,7 +223,8 @@ WalReceiverMain(void)
#endif
/* Properly accept or ignore signals the postmaster might send us */
- pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config
+ * file */
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
@@ -264,9 +266,9 @@ WalReceiverMain(void)
/* Loop until end-of-streaming or error */
for (;;)
{
- unsigned char type;
- char *buf;
- int len;
+ unsigned char type;
+ char *buf;
+ int len;
/*
* Emergency bailout if postmaster has died. This is to avoid the
@@ -299,12 +301,12 @@ WalReceiverMain(void)
XLogWalRcvProcessMsg(type, buf, len);
/* Receive any more data we can without sleeping */
- while(walrcv_receive(0, &type, &buf, &len))
+ while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
/*
- * If we've written some records, flush them to disk and
- * let the startup process know about them.
+ * If we've written some records, flush them to disk and let the
+ * startup process know about them.
*/
XLogWalRcvFlush();
}
@@ -375,8 +377,8 @@ WalRcvQuickDieHandler(SIGNAL_ARGS)
* system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
* backend. This is necessary precisely because we don't clean up our
* shared memory state. (The "dead man switch" mechanism in pmsignal.c
- * should ensure the postmaster sees this as a crash, too, but no harm
- * in being doubly sure.)
+ * should ensure the postmaster sees this as a crash, too, but no harm in
+ * being doubly sure.)
*/
exit(2);
}
@@ -389,20 +391,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
switch (type)
{
- case 'w': /* WAL records */
- {
- XLogRecPtr recptr;
+ case 'w': /* WAL records */
+ {
+ XLogRecPtr recptr;
- if (len < sizeof(XLogRecPtr))
- ereport(ERROR,
- (errmsg("invalid WAL message received from primary")));
+ if (len < sizeof(XLogRecPtr))
+ ereport(ERROR,
+ (errmsg("invalid WAL message received from primary")));
- recptr = *((XLogRecPtr *) buf);
- buf += sizeof(XLogRecPtr);
- len -= sizeof(XLogRecPtr);
- XLogWalRcvWrite(buf, len, recptr);
- break;
- }
+ recptr = *((XLogRecPtr *) buf);
+ buf += sizeof(XLogRecPtr);
+ len -= sizeof(XLogRecPtr);
+ XLogWalRcvWrite(buf, len, recptr);
+ break;
+ }
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -417,20 +419,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
static void
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
- int startoff;
- int byteswritten;
+ int startoff;
+ int byteswritten;
while (nbytes > 0)
{
- int segbytes;
+ int segbytes;
if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
{
- bool use_existent;
+ bool use_existent;
/*
- * fsync() and close current file before we switch to next one.
- * We would otherwise have to reopen this file to fsync it later
+ * fsync() and close current file before we switch to next one. We
+ * would otherwise have to reopen this file to fsync it later
*/
if (recvFile >= 0)
{
@@ -444,8 +446,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not close log file %u, segment %u: %m",
- recvId, recvSeg)));
+ errmsg("could not close log file %u, segment %u: %m",
+ recvId, recvSeg)));
}
recvFile = -1;
@@ -500,14 +502,13 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
nbytes -= byteswritten;
buf += byteswritten;
- LogstreamResult.Write = recptr;
+ LogstreamResult.Write = recptr;
/*
- * XXX: Should we signal bgwriter to start a restartpoint
- * if we've consumed too much xlog since the last one, like
- * in normal processing? But this is not worth doing unless
- * a restartpoint can be created independently from a
- * checkpoint record.
+ * XXX: Should we signal bgwriter to start a restartpoint if we've
+ * consumed too much xlog since the last one, like in normal
+ * processing? But this is not worth doing unless a restartpoint can
+ * be created independently from a checkpoint record.
*/
}
}
@@ -520,7 +521,7 @@ XLogWalRcvFlush(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- char activitymsg[50];
+ char activitymsg[50];
issue_xlog_fsync(recvFile, recvId, recvSeg);
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 4fb132dcd4e..be305790fd3 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.4 2010/02/26 02:00:57 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -42,7 +42,7 @@ WalRcvData *WalRcv = NULL;
Size
WalRcvShmemSize(void)
{
- Size size = 0;
+ Size size = 0;
size = add_size(size, sizeof(WalRcvData));
@@ -53,7 +53,7 @@ WalRcvShmemSize(void)
void
WalRcvShmemInit(void)
{
- bool found;
+ bool found;
WalRcv = (WalRcvData *)
ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
@@ -78,7 +78,7 @@ WalRcvInProgress(void)
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
WalRcvState state;
- pg_time_t startTime;
+ pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex);
@@ -88,14 +88,14 @@ WalRcvInProgress(void)
SpinLockRelease(&walrcv->mutex);
/*
- * If it has taken too long for walreceiver to start up, give up.
- * Setting the state to STOPPED ensures that if walreceiver later
- * does start up after all, it will see that it's not supposed to be
- * running and die without doing anything.
+ * If it has taken too long for walreceiver to start up, give up. Setting
+ * the state to STOPPED ensures that if walreceiver later does start up
+ * after all, it will see that it's not supposed to be running and die
+ * without doing anything.
*/
if (state == WALRCV_STARTING)
{
- pg_time_t now = (pg_time_t) time(NULL);
+ pg_time_t now = (pg_time_t) time(NULL);
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
{
@@ -122,7 +122,7 @@ ShutdownWalRcv(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- pid_t walrcvpid = 0;
+ pid_t walrcvpid = 0;
/*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -130,7 +130,7 @@ ShutdownWalRcv(void)
* restart itself.
*/
SpinLockAcquire(&walrcv->mutex);
- switch(walrcv->walRcvState)
+ switch (walrcv->walRcvState)
{
case WALRCV_STOPPED:
break;
@@ -180,14 +180,13 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- pg_time_t now = (pg_time_t) time(NULL);
+ pg_time_t now = (pg_time_t) time(NULL);
/*
- * We always start at the beginning of the segment.
- * That prevents a broken segment (i.e., with no records in the
- * first half of a segment) from being created by XLOG streaming,
- * which might cause trouble later on if the segment is e.g
- * archived.
+ * We always start at the beginning of the segment. That prevents a broken
+ * segment (i.e., with no records in the first half of a segment) from
+ * being created by XLOG streaming, which might cause trouble later on if
+ * the segment is e.g archived.
*/
if (recptr.xrecoff % XLogSegSize != 0)
recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
@@ -225,4 +224,3 @@ GetWalRcvWriteRecPtr(void)
return recptr;
}
-
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index a8706ab06f6..2a2765645e4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -30,7 +30,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.8 2010/02/25 07:31:40 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.9 2010/02/26 02:00:58 momjian Exp $
*
*-------------------------------------------------------------------------
*/
@@ -61,11 +61,11 @@ WalSndCtlData *WalSndCtl = NULL;
static WalSnd *MyWalSnd = NULL;
/* Global state */
-bool am_walsender = false; /* Am I a walsender process ? */
+bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
-int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */
-int WalSndDelay = 200; /* max sleep time between some actions */
+int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */
+int WalSndDelay = 200; /* max sleep time between some actions */
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
@@ -96,9 +96,9 @@ static void WalSndQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
static int WalSndLoop(void);
-static void InitWalSnd(void);
-static void WalSndHandshake(void);
-static void WalSndKill(int code, Datum arg);
+static void InitWalSnd(void);
+static void WalSndHandshake(void);
+static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
static bool XLogSend(StringInfo outMsg);
static void CheckClosedConnection(void);
@@ -155,13 +155,13 @@ static void
WalSndHandshake(void)
{
StringInfoData input_message;
- bool replication_started = false;
+ bool replication_started = false;
initStringInfo(&input_message);
while (!replication_started)
{
- int firstchar;
+ int firstchar;
/* Wait for a command to arrive */
firstchar = pq_getbyte();
@@ -183,99 +183,99 @@ WalSndHandshake(void)
* blocking because we've been able to get message type code.
*/
if (pq_getmessage(&input_message, 0))
- firstchar = EOF; /* suitable message already logged */
+ firstchar = EOF; /* suitable message already logged */
}
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
- case 'Q': /* Query message */
- {
- const char *query_string;
- XLogRecPtr recptr;
-
- query_string = pq_getmsgstring(&input_message);
- pq_getmsgend(&input_message);
-
- if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
- {
- StringInfoData buf;
- char sysid[32];
- char tli[11];
-
- /*
- * Reply with a result set with one row, two columns.
- * First col is system ID, and second if timeline ID
- */
-
- snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
- GetSystemIdentifier());
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
-
- /* Send a RowDescription message */
- pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 2, 2); /* 2 fields */
-
- /* first field */
- pq_sendstring(&buf, "systemid"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, TEXTOID, 4); /* type oid */
- pq_sendint(&buf, -1, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
-
- /* second field */
- pq_sendstring(&buf, "timeline"); /* col name */
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
- pq_sendint(&buf, INT4OID, 4); /* type oid */
- pq_sendint(&buf, 4, 2); /* typlen */
- pq_sendint(&buf, 0, 4); /* typmod */
- pq_sendint(&buf, 0, 2); /* format code */
- pq_endmessage(&buf);
-
- /* Send a DataRow message */
- pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 2, 2); /* # of columns */
- pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
- pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
- pq_sendint(&buf, strlen(tli), 4); /* col2 len */
- pq_sendbytes(&buf, (char *) tli, strlen(tli));
- pq_endmessage(&buf);
-
- /* Send CommandComplete and ReadyForQuery messages */
- EndCommand("SELECT", DestRemote);
- ReadyForQuery(DestRemote);
- }
- else if (sscanf(query_string, "START_REPLICATION %X/%X",
- &recptr.xlogid, &recptr.xrecoff) == 2)
+ case 'Q': /* Query message */
{
- StringInfoData buf;
-
- /* Send a CopyOutResponse message, and start streaming */
- pq_beginmessage(&buf, 'H');
- pq_sendbyte(&buf, 0);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
-
- /*
- * Initialize position to the received one, then
- * the xlog records begin to be shipped from that position
- */
- sentPtr = recptr;
-
- /* break out of the loop */
- replication_started = true;
+ const char *query_string;
+ XLogRecPtr recptr;
+
+ query_string = pq_getmsgstring(&input_message);
+ pq_getmsgend(&input_message);
+
+ if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0)
+ {
+ StringInfoData buf;
+ char sysid[32];
+ char tli[11];
+
+ /*
+ * Reply with a result set with one row, two columns.
+ * First col is system ID, and second if timeline ID
+ */
+
+ snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
+ GetSystemIdentifier());
+ snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
+
+ /* Send a RowDescription message */
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 2, 2); /* 2 fields */
+
+ /* first field */
+ pq_sendstring(&buf, "systemid"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field */
+ pq_sendstring(&buf, "timeline"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, INT4OID, 4); /* type oid */
+ pq_sendint(&buf, 4, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 2, 2); /* # of columns */
+ pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
+ pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
+ pq_sendint(&buf, strlen(tli), 4); /* col2 len */
+ pq_sendbytes(&buf, (char *) tli, strlen(tli));
+ pq_endmessage(&buf);
+
+ /* Send CommandComplete and ReadyForQuery messages */
+ EndCommand("SELECT", DestRemote);
+ ReadyForQuery(DestRemote);
+ }
+ else if (sscanf(query_string, "START_REPLICATION %X/%X",
+ &recptr.xlogid, &recptr.xrecoff) == 2)
+ {
+ StringInfoData buf;
+
+ /* Send a CopyOutResponse message, and start streaming */
+ pq_beginmessage(&buf, 'H');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+
+ /*
+ * Initialize position to the received one, then the
+ * xlog records begin to be shipped from that position
+ */
+ sentPtr = recptr;
+
+ /* break out of the loop */
+ replication_started = true;
+ }
+ else
+ {
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid standby query string: %s", query_string)));
+ }
+ break;
}
- else
- {
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby query string: %s", query_string)));
- }
- break;
- }
case 'X':
/* standby is closing the connection */
@@ -303,7 +303,7 @@ static void
CheckClosedConnection(void)
{
unsigned char firstchar;
- int r;
+ int r;
r = pq_getbyte_if_available(&firstchar);
if (r < 0)
@@ -323,9 +323,9 @@ CheckClosedConnection(void)
/* Handle the very limited subset of commands expected in this phase */
switch (firstchar)
{
- /*
- * 'X' means that the standby is closing down the socket.
- */
+ /*
+ * 'X' means that the standby is closing down the socket.
+ */
case 'X':
proc_exit(0);
@@ -348,7 +348,7 @@ WalSndLoop(void)
/* Loop forever */
for (;;)
{
- int remain; /* remaining time (ms) */
+ int remain; /* remaining time (ms) */
/*
* Emergency bailout if postmaster has died. This is to avoid the
@@ -416,15 +416,16 @@ WalSndLoop(void)
return 1;
eof:
+
/*
- * Reset whereToSendOutput to prevent ereport from attempting
- * to send any more messages to the standby.
+ * Reset whereToSendOutput to prevent ereport from attempting to send any
+ * more messages to the standby.
*/
if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone;
proc_exit(0);
- return 1; /* keep the compiler quiet */
+ return 1; /* keep the compiler quiet */
}
/* Initialize a per-walsender data structure for this walsender process */
@@ -432,7 +433,7 @@ static void
InitWalSnd(void)
{
/* use volatile pointer to prevent code rearrangement */
- int i;
+ int i;
/*
* WalSndCtl should be set up already (we inherit this by fork() or
@@ -497,13 +498,13 @@ WalSndKill(int code, Datum arg)
void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
{
- char path[MAXPGPATH];
- uint32 startoff;
+ char path[MAXPGPATH];
+ uint32 startoff;
while (nbytes > 0)
{
- int segbytes;
- int readbytes;
+ int segbytes;
+ int readbytes;
startoff = recptr.xrecoff % XLogSegSize;
@@ -518,7 +519,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
- ereport(FATAL, /* XXX: Why FATAL? */
+ ereport(FATAL, /* XXX: Why FATAL? */
(errcode_for_file_access(),
errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
path, sendId, sendSeg)));
@@ -546,9 +547,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
if (readbytes <= 0)
ereport(FATAL,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log file %u, segment %u, offset %u, "
+ "length %lu: %m",
+ sendId, sendSeg, sendOff, (unsigned long) segbytes)));
/* Update state for read */
XLByteAdvance(recptr, readbytes);
@@ -569,7 +570,8 @@ static bool
XLogSend(StringInfo outMsg)
{
XLogRecPtr SendRqstPtr;
- char activitymsg[50];
+ char activitymsg[50];
+
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
@@ -581,15 +583,15 @@ XLogSend(StringInfo outMsg)
return true;
/*
- * We gather multiple records together by issuing just one XLogRead()
- * of a suitable size, and send them as one CopyData message. Repeat
- * until we've sent everything we can.
+ * We gather multiple records together by issuing just one XLogRead() of a
+ * suitable size, and send them as one CopyData message. Repeat until
+ * we've sent everything we can.
*/
while (XLByteLT(sentPtr, SendRqstPtr))
{
- XLogRecPtr startptr;
- XLogRecPtr endptr;
- Size nbytes;
+ XLogRecPtr startptr;
+ XLogRecPtr endptr;
+ Size nbytes;
/*
* Figure out how much to send in one message. If there's less than
@@ -600,8 +602,8 @@ XLogSend(StringInfo outMsg)
* relies on the fact that we never split a WAL record across two
* messages. Since a long WAL record is split at page boundary into
* continuation records, page boundary is always a safe cut-off point.
- * We also assume that SendRqstPtr never points in the middle of a
- * WAL record.
+ * We also assume that SendRqstPtr never points in the middle of a WAL
+ * record.
*/
startptr = sentPtr;
if (startptr.xrecoff >= XLogFileSize)
@@ -625,10 +627,10 @@ XLogSend(StringInfo outMsg)
/*
* OK to read and send the slice.
*
- * We don't need to convert the xlogid/xrecoff from host byte order
- * to network byte order because the both server can be expected to
- * have the same byte order. If they have different byte order, we
- * don't reach here.
+ * We don't need to convert the xlogid/xrecoff from host byte order to
+ * network byte order because the both server can be expected to have
+ * the same byte order. If they have different byte order, we don't
+ * reach here.
*/
pq_sendbyte(outMsg, 'w');
pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
@@ -644,8 +646,8 @@ XLogSend(StringInfo outMsg)
sentPtr = endptr;
/*
- * Read the log directly into the output buffer to prevent
- * extra memcpy calls.
+ * Read the log directly into the output buffer to prevent extra
+ * memcpy calls.
*/
enlargeStringInfo(outMsg, nbytes);
@@ -714,8 +716,8 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
* system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
* backend. This is necessary precisely because we don't clean up our
* shared memory state. (The "dead man switch" mechanism in pmsignal.c
- * should ensure the postmaster sees this as a crash, too, but no harm
- * in being doubly sure.)
+ * should ensure the postmaster sees this as a crash, too, but no harm in
+ * being doubly sure.)
*/
exit(2);
}
@@ -732,14 +734,16 @@ void
WalSndSignals(void)
{
/* Set up signal handlers */
- pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config file */
+ pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
+ * file */
pqsignal(SIGINT, SIG_IGN); /* not used */
pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */
pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN); /* not used */
- pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */
+ pqsignal(SIGUSR1, SIG_IGN); /* not used */
+ pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
+ * shutdown */
/* Reset some signals that are accepted by postmaster but not here */
pqsignal(SIGCHLD, SIG_DFL);
@@ -753,7 +757,7 @@ WalSndSignals(void)
Size
WalSndShmemSize(void)
{
- Size size = 0;
+ Size size = 0;
size = offsetof(WalSndCtlData, walsnds);
size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd)));
@@ -765,8 +769,8 @@ WalSndShmemSize(void)
void
WalSndShmemInit(void)
{
- bool found;
- int i;
+ bool found;
+ int i;
WalSndCtl = (WalSndCtlData *)
ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
@@ -783,7 +787,8 @@ WalSndShmemInit(void)
for (i = 0; i < MaxWalSenders; i++)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
SpinLockInit(&walsnd->mutex);
}
}
@@ -795,15 +800,15 @@ WalSndShmemInit(void)
XLogRecPtr
GetOldestWALSendPointer(void)
{
- XLogRecPtr oldest = {0, 0};
- int i;
- bool found = false;
+ XLogRecPtr oldest = {0, 0};
+ int i;
+ bool found = false;
for (i = 0; i < MaxWalSenders; i++)
{
/* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- XLogRecPtr recptr;
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ XLogRecPtr recptr;
if (walsnd->pid == 0)
continue;