diff options
| author | Bruce Momjian | 2014-05-06 16:12:18 +0000 |
|---|---|---|
| committer | Bruce Momjian | 2014-05-06 16:12:18 +0000 |
| commit | 0a7832005792fa6dad171f9cadb8d587fe0dd800 (patch) | |
| tree | 365cfc42c521a52607e41394b08ef44d338d8fc1 /src/backend/replication | |
| parent | fb85cd4320414c3f6e9c8bc69ec944200ae1e493 (diff) | |
pgindent run for 9.4
This includes removing tabs after periods in C comments, which was
applied to back branches, so this change should not effect backpatching.
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/basebackup.c | 20 | ||||
| -rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/logical/decode.c | 77 | ||||
| -rw-r--r-- | src/backend/replication/logical/logical.c | 135 | ||||
| -rw-r--r-- | src/backend/replication/logical/logicalfuncs.c | 25 | ||||
| -rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 82 | ||||
| -rw-r--r-- | src/backend/replication/logical/snapbuild.c | 138 | ||||
| -rw-r--r-- | src/backend/replication/slot.c | 91 | ||||
| -rw-r--r-- | src/backend/replication/slotfuncs.c | 9 | ||||
| -rw-r--r-- | src/backend/replication/syncrep.c | 6 | ||||
| -rw-r--r-- | src/backend/replication/walreceiver.c | 10 | ||||
| -rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 2 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 164 |
13 files changed, 399 insertions, 362 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 42e66f2fed7..a3bf5001ec8 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -137,8 +137,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) SendXlogRecPtrResult(startptr, starttli); /* - * Calculate the relative path of temporary statistics directory - * in order to skip the files which are located in that directory later. + * Calculate the relative path of temporary statistics directory in order + * to skip the files which are located in that directory later. */ if (is_absolute_path(pgstat_stat_directory) && strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0) @@ -231,8 +231,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY; /* - * The minimum amount of time for throttling_sample - * bytes to be transfered. + * The minimum amount of time for throttling_sample bytes to be + * transfered. */ elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; @@ -613,7 +613,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), errmsg("%d is outside the valid range for parameter \"%s\" (%d .. %d)", - (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER))); + (int) maxrate, "MAX_RATE", MAX_RATE_LOWER, MAX_RATE_UPPER))); opt->maxrate = (uint32) maxrate; o_maxrate = true; @@ -841,7 +841,7 @@ sendFileWithContent(const char *filename, const char *content) /* * Include the tablespace directory pointed to by 'path' in the output tar - * stream. If 'sizeonly' is true, we just calculate a total length and return + * stream. If 'sizeonly' is true, we just calculate a total length and return * it, without actually sending anything. * * Only used to send auxiliary tablespaces, not PGDATA. @@ -975,7 +975,7 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces) * always created there. */ if ((statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0) || - strncmp(de->d_name, PG_STAT_TMP_DIR, strlen(PG_STAT_TMP_DIR)) == 0) + strncmp(de->d_name, PG_STAT_TMP_DIR, strlen(PG_STAT_TMP_DIR)) == 0) { if (!sizeonly) _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); @@ -1270,14 +1270,14 @@ throttle(size_t increment) * the maximum time to sleep. Thus the cast to long is safe. */ wait_result = WaitLatch(&MyWalSnd->latch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); } else { /* - * The actual transfer rate is below the limit. A negative value would - * distort the adjustment of throttled_last. + * The actual transfer rate is below the limit. A negative value + * would distort the adjustment of throttled_last. */ wait_result = 0; sleep = 0; diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 88d27c7690e..7bc761db8f4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo); static void libpqrcv_identify_system(TimeLineID *primary_tli); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, - char *slotname); + char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 414cfa95586..7b6114a2097 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -9,12 +9,12 @@ * * NOTE: * This basically tries to handle all low level xlog stuff for - * reorderbuffer.c and snapbuild.c. There's some minor leakage where a - * specific record's struct is used to pass data along, but those just - * happen to contain the right amount of data in a convenient - * format. There isn't and shouldn't be much intelligence about the - * contents of records in here except turning them into a more usable - * format. + * reorderbuffer.c and snapbuild.c. There's some minor leakage where a + * specific record's struct is used to pass data along, but those just + * happen to contain the right amount of data in a convenient + * format. There isn't and shouldn't be much intelligence about the + * contents of records in here except turning them into a more usable + * format. * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -44,10 +44,10 @@ typedef struct XLogRecordBuffer { - XLogRecPtr origptr; - XLogRecPtr endptr; - XLogRecord record; - char *record_data; + XLogRecPtr origptr; + XLogRecPtr endptr; + XLogRecord record; + char *record_data; } XLogRecordBuffer; /* RMGR Handlers */ @@ -63,10 +63,10 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, - TransactionId xid, Oid dboid, - TimestampTz commit_time, - int nsubxacts, TransactionId *sub_xids, - int ninval_msgs, SharedInvalidationMessage *msg); + TransactionId xid, Oid dboid, + TimestampTz commit_time, + int nsubxacts, TransactionId *sub_xids, + int ninval_msgs, SharedInvalidationMessage *msg); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, TransactionId *sub_xids, int nsubxacts); @@ -91,10 +91,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) /* cast so we get a warning when new rmgrs are added */ switch ((RmgrIds) buf.record.xl_rmid) { - /* - * Rmgrs we care about for logical decoding. Add new rmgrs in - * rmgrlist.h's order. - */ + /* + * Rmgrs we care about for logical decoding. Add new rmgrs in + * rmgrlist.h's order. + */ case RM_XLOG_ID: DecodeXLogOp(ctx, &buf); break; @@ -115,11 +115,11 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) DecodeHeapOp(ctx, &buf); break; - /* - * Rmgrs irrelevant for logical decoding; they describe stuff not - * represented in logical decoding. Add new rmgrs in rmgrlist.h's - * order. - */ + /* + * Rmgrs irrelevant for logical decoding; they describe stuff not + * represented in logical decoding. Add new rmgrs in rmgrlist.h's + * order. + */ case RM_SMGR_ID: case RM_CLOG_ID: case RM_DBASE_ID: @@ -149,13 +149,14 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) switch (info) { - /* this is also used in END_OF_RECOVERY checkpoints */ + /* this is also used in END_OF_RECOVERY checkpoints */ case XLOG_CHECKPOINT_SHUTDOWN: case XLOG_END_OF_RECOVERY: SnapBuildSerializationPoint(builder, buf->origptr); break; case XLOG_CHECKPOINT_ONLINE: + /* * a RUNNING_XACTS record will have been logged near to this, we * can restart from there. @@ -181,9 +182,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - SnapBuild *builder = ctx->snapshot_builder; - ReorderBuffer *reorder = ctx->reorder; - XLogRecord *r = &buf->record; + SnapBuild *builder = ctx->snapshot_builder; + ReorderBuffer *reorder = ctx->reorder; + XLogRecord *r = &buf->record; uint8 info = r->xl_info & ~XLR_INFO_MASK; /* no point in doing anything yet, data could not be decoded anyway */ @@ -280,7 +281,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) int i; TransactionId *sub_xid; - xlrec = (xl_xact_assignment *) buf->record_data; + xlrec = (xl_xact_assignment *) buf->record_data; sub_xid = &xlrec->xsub[0]; @@ -292,6 +293,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_PREPARE: + /* * Currently decoding ignores PREPARE TRANSACTION and will just * decode the transaction when the COMMIT PREPARED is sent or @@ -321,7 +323,9 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_RUNNING_XACTS: { xl_running_xacts *running = (xl_running_xacts *) buf->record_data; + SnapBuildProcessRunningXacts(builder, buf->origptr, running); + /* * Abort all transactions that we keep track of, that are * older than the record's oldestRunningXid. This is the most @@ -364,22 +368,25 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP2_NEW_CID: { xl_heap_new_cid *xlrec; + xlrec = (xl_heap_new_cid *) buf->record_data; SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); break; } case XLOG_HEAP2_REWRITE: + /* * Although these records only exist to serve the needs of logical * decoding, all the work happens as part of crash or archive * recovery, so we don't need to do anything here. */ break; - /* - * Everything else here is just low level physical stuff we're - * not interested in. - */ + + /* + * Everything else here is just low level physical stuff we're not + * interested in. + */ case XLOG_HEAP2_FREEZE_PAGE: case XLOG_HEAP2_CLEAN: case XLOG_HEAP2_CLEANUP_INFO: @@ -429,6 +436,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_NEWPAGE: + /* * This is only used in places like indexams and CLUSTER which * don't contain changes relevant for logical replication. @@ -436,6 +444,7 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; case XLOG_HEAP_INPLACE: + /* * Inplace updates are only ever performed on catalog tuples and * can, per definition, not change tuple visibility. Since we @@ -503,8 +512,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * There basically two reasons we might not be interested in this * transaction: * 1) We might not be interested in decoding transactions up to this - * LSN. This can happen because we previously decoded it and now just - * are restarting or if we haven't assembled a consistent snapshot yet. + * LSN. This can happen because we previously decoded it and now just + * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. * * We can't just use ReorderBufferAbort() here, because we need to execute diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1d08b50da39..438a3fb152d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -8,21 +8,21 @@ * src/backend/replication/logical/logical.c * * NOTES - * This file coordinates interaction between the various modules that - * together provide logical decoding, primarily by providing so - * called LogicalDecodingContexts. The goal is to encapsulate most of the - * internal complexity for consumers of logical decoding, so they can - * create and consume a changestream with a low amount of code. Builtin - * consumers are the walsender and SQL SRF interface, but it's possible to - * add further ones without changing core code, e.g. to consume changes in - * a bgworker. + * This file coordinates interaction between the various modules that + * together provide logical decoding, primarily by providing so + * called LogicalDecodingContexts. The goal is to encapsulate most of the + * internal complexity for consumers of logical decoding, so they can + * create and consume a changestream with a low amount of code. Builtin + * consumers are the walsender and SQL SRF interface, but it's possible to + * add further ones without changing core code, e.g. to consume changes in + * a bgworker. * - * The idea is that a consumer provides three callbacks, one to read WAL, - * one to prepare a data write, and a final one for actually writing since - * their implementation depends on the type of consumer. Check - * logicalfuncs.c for an example implementation of a fairly simple consumer - * and a implementation of a WAL reading callback that's suitable for - * simple consumers. + * The idea is that a consumer provides three callbacks, one to read WAL, + * one to prepare a data write, and a final one for actually writing since + * their implementation depends on the type of consumer. Check + * logicalfuncs.c for an example implementation of a fairly simple consumer + * and a implementation of a WAL reading callback that's suitable for + * simple consumers. *------------------------------------------------------------------------- */ @@ -56,13 +56,13 @@ typedef struct LogicalErrorCallbackState /* wrappers around output plugin callbacks */ static void output_plugin_error_callback(void *arg); static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, - bool is_init); + bool is_init); static void shutdown_cb_wrapper(LogicalDecodingContext *ctx); static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn); static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change); + Relation relation, ReorderBufferChange *change); static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin); @@ -90,18 +90,18 @@ CheckLogicalDecodingRequirements(void) * * There's basically three things missing to allow this: * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to + * LSN belongs to * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. + * the primary cannot remove rows we need. * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. + * dbase_redo. There can't be any active ones due to HS recovery + * conflicts, so that should be relatively easy. * ---- */ if (RecoveryInProgress()) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + errmsg("logical decoding cannot be used while in recovery"))); } /* @@ -117,7 +117,8 @@ StartupDecodingContext(List *output_plugin_options, LogicalOutputPluginWriterWrite do_write) { ReplicationSlot *slot; - MemoryContext context, old_context; + MemoryContext context, + old_context; LogicalDecodingContext *ctx; /* shorter lines... */ @@ -133,7 +134,10 @@ StartupDecodingContext(List *output_plugin_options, ctx->context = context; - /* (re-)load output plugins, so we detect a bad (removed) output plugin now. */ + /* + * (re-)load output plugins, so we detect a bad (removed) output plugin + * now. + */ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); /* @@ -195,10 +199,10 @@ CreateInitDecodingContext(char *plugin, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) { - TransactionId xmin_horizon = InvalidTransactionId; + TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; LogicalDecodingContext *ctx; - MemoryContext old_context; + MemoryContext old_context; /* shorter lines... */ slot = MyReplicationSlot; @@ -219,8 +223,8 @@ CreateInitDecodingContext(char *plugin, if (slot->data.database != MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" was not created in this database", - NameStr(slot->data.name)))); + errmsg("replication slot \"%s\" was not created in this database", + NameStr(slot->data.name)))); if (IsTransactionState() && GetTopTransactionIdIfAny() != InvalidTransactionId) @@ -252,9 +256,9 @@ CreateInitDecodingContext(char *plugin, */ if (!RecoveryInProgress()) { - XLogRecPtr flushptr; + XLogRecPtr flushptr; - /* start at current insert position*/ + /* start at current insert position */ slot->data.restart_lsn = GetXLogInsertRecPtr(); /* make sure we have enough information to start */ @@ -307,8 +311,8 @@ CreateInitDecodingContext(char *plugin, LWLockRelease(ProcArrayLock); /* - * tell the snapshot builder to only assemble snapshot once reaching - * the a running_xact's record with the respective xmin. + * tell the snapshot builder to only assemble snapshot once reaching the a + * running_xact's record with the respective xmin. */ xmin_horizon = slot->data.catalog_xmin; @@ -316,7 +320,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -352,7 +356,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, { LogicalDecodingContext *ctx; ReplicationSlot *slot; - MemoryContext old_context; + MemoryContext old_context; /* shorter lines... */ slot = MyReplicationSlot; @@ -370,8 +374,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, if (slot->data.database != MyDatabaseId) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - (errmsg("replication slot \"%s\" was not created in this database", - NameStr(slot->data.name))))); + (errmsg("replication slot \"%s\" was not created in this database", + NameStr(slot->data.name))))); if (start_lsn == InvalidXLogRecPtr) { @@ -385,14 +389,14 @@ CreateDecodingContext(XLogRecPtr start_lsn, * pretty common for a client to acknowledge a LSN it doesn't have to * do anything for, and thus didn't store persistently, because the * xlog records didn't result in anything relevant for logical - * decoding. Clients have to be able to do that to support - * synchronous replication. + * decoding. Clients have to be able to do that to support synchronous + * replication. */ start_lsn = slot->data.confirmed_flush; elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding", - (uint32)(start_lsn >> 32), (uint32)start_lsn, - (uint32)(slot->data.confirmed_flush >> 32), - (uint32)slot->data.confirmed_flush); + (uint32) (start_lsn >> 32), (uint32) start_lsn, + (uint32) (slot->data.confirmed_flush >> 32), + (uint32) slot->data.confirmed_flush); } ctx = StartupDecodingContext(output_plugin_options, @@ -409,10 +413,10 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errmsg("starting logical decoding for slot %s", NameStr(slot->data.name)), errdetail("streaming transactions committing after %X/%X, reading WAL from %X/%X", - (uint32)(slot->data.confirmed_flush >> 32), - (uint32)slot->data.confirmed_flush, - (uint32)(slot->data.restart_lsn >> 32), - (uint32)slot->data.restart_lsn))); + (uint32) (slot->data.confirmed_flush >> 32), + (uint32) slot->data.confirmed_flush, + (uint32) (slot->data.restart_lsn >> 32), + (uint32) slot->data.restart_lsn))); return ctx; } @@ -438,8 +442,8 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) startptr = ctx->slot->data.restart_lsn; elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X", - (uint32)(ctx->slot->data.restart_lsn >> 32), - (uint32)ctx->slot->data.restart_lsn); + (uint32) (ctx->slot->data.restart_lsn >> 32), + (uint32) ctx->slot->data.restart_lsn); /* Wait for a consistent starting point */ for (;;) @@ -543,14 +547,15 @@ static void output_plugin_error_callback(void *arg) { LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg; + /* not all callbacks have an associated LSN */ if (state->report_location != InvalidXLogRecPtr) errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X", NameStr(state->ctx->slot->data.name), NameStr(state->ctx->slot->data.plugin), state->callback_name, - (uint32)(state->report_location >> 32), - (uint32)state->report_location); + (uint32) (state->report_location >> 32), + (uint32) state->report_location); else errcontext("slot \"%s\", output plugin \"%s\", in the %s callback", NameStr(state->ctx->slot->data.name), @@ -643,7 +648,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -652,7 +657,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "commit"; - state.report_location = txn->final_lsn; /* beginning of commit record */ + state.report_location = txn->final_lsn; /* beginning of commit record */ errcallback.callback = output_plugin_error_callback; errcallback.arg = (void *) &state; errcallback.previous = error_context_stack; @@ -672,7 +677,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - Relation relation, ReorderBufferChange *change) + Relation relation, ReorderBufferChange *change) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -690,6 +695,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* set output state */ ctx->accept_writes = true; ctx->write_xid = txn->xid; + /* * report this change's lsn so replies from clients can give an up2date * answer. This won't ever be enough (and shouldn't be!) to confirm @@ -715,7 +721,7 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) { - bool updated_xmin = false; + bool updated_xmin = false; ReplicationSlot *slot; slot = MyReplicationSlot; @@ -725,16 +731,17 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) SpinLockAcquire(&slot->mutex); /* - * don't overwrite if we already have a newer xmin. This can - * happen if we restart decoding in a slot. + * don't overwrite if we already have a newer xmin. This can happen if we + * restart decoding in a slot. */ if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin)) { } + /* - * If the client has already confirmed up to this lsn, we directly - * can mark this as accepted. This can happen if we restart - * decoding in a slot. + * If the client has already confirmed up to this lsn, we directly can + * mark this as accepted. This can happen if we restart decoding in a + * slot. */ else if (current_lsn <= slot->data.confirmed_flush) { @@ -744,6 +751,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) /* our candidate can directly be used */ updated_xmin = true; } + /* * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. @@ -770,7 +778,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn) { - bool updated_lsn = false; + bool updated_lsn = false; ReplicationSlot *slot; slot = MyReplicationSlot; @@ -781,13 +789,14 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart SpinLockAcquire(&slot->mutex); - /* don't overwrite if have a newer restart lsn*/ + /* don't overwrite if have a newer restart lsn */ if (restart_lsn <= slot->data.restart_lsn) { } + /* - * We might have already flushed far enough to directly accept this lsn, in - * this case there is no need to check for existing candidate LSNs + * We might have already flushed far enough to directly accept this lsn, + * in this case there is no need to check for existing candidate LSNs */ else if (current_lsn <= slot->data.confirmed_flush) { @@ -797,6 +806,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart /* our candidate can directly be used */ updated_lsn = true; } + /* * Only increase if the previous values have been applied, otherwise we * might never end up updating if the receiver acks too slowly. A missed @@ -896,6 +906,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) ReplicationSlotSave(); elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart); } + /* * Now the new xmin is safely on disk, we can let the global value * advance. We do not take ProcArrayLock or similar since we only diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 5fa1848001d..2da6bb10b22 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -42,11 +42,12 @@ #include "storage/fd.h" /* private date for writing out data */ -typedef struct DecodingOutputState { +typedef struct DecodingOutputState +{ Tuplestorestate *tupstore; - TupleDesc tupdesc; - bool binary_output; - int64 returned_rows; + TupleDesc tupdesc; + bool binary_output; + int64 returned_rows; } DecodingOutputState; /* @@ -91,7 +92,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ values[2] = PointerGetDatum( - cstring_to_text_with_len(ctx->out->data, ctx->out->len)); + cstring_to_text_with_len(ctx->out->data, ctx->out->len)); tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); p->returned_rows++; @@ -412,7 +413,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin InvalidateSystemCaches(); while ((startptr != InvalidXLogRecPtr && startptr < end_of_wal) || - (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal)) + (ctx->reader->EndRecPtr && ctx->reader->EndRecPtr < end_of_wal)) { XLogRecord *record; char *errm = NULL; @@ -474,7 +475,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin Datum pg_logical_slot_get_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, false); + return ret; } @@ -484,7 +486,8 @@ pg_logical_slot_get_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, false); + return ret; } @@ -494,7 +497,8 @@ pg_logical_slot_peek_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, true, true); + return ret; } @@ -504,6 +508,7 @@ pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS) Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS) { - Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true); + Datum ret = pg_logical_slot_get_changes_guts(fcinfo, false, true); + return ret; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a2b2adb1732..7f2bbca302e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -60,7 +60,7 @@ #include "replication/logical.h" #include "replication/reorderbuffer.h" #include "replication/slot.h" -#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ +#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/sinval.h" @@ -582,7 +582,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, */ void ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, - ReorderBufferChange *change) + ReorderBufferChange *change) { ReorderBufferTXN *txn; @@ -1047,8 +1047,8 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) } /* - * Cleanup the tuplecids we stored for decoding catalog snapshot - * access. They are always stored in the toplevel transaction. + * Cleanup the tuplecids we stored for decoding catalog snapshot access. + * They are always stored in the toplevel transaction. */ dlist_foreach_modify(iter, &txn->tuplecids) { @@ -1204,9 +1204,9 @@ ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, snap->subxip[i++] = txn->xid; /* - * nsubxcnt isn't decreased when subtransactions abort, so count - * manually. Since it's an upper boundary it is safe to use it for the - * allocation above. + * nsubxcnt isn't decreased when subtransactions abort, so count manually. + * Since it's an upper boundary it is safe to use it for the allocation + * above. */ snap->subxcnt = 1; @@ -1262,10 +1262,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNState *iterstate = NULL; ReorderBufferChange *change; - volatile CommandId command_id = FirstCommandId; - volatile Snapshot snapshot_now = NULL; - volatile bool txn_started = false; - volatile bool subtxn_started = false; + volatile CommandId command_id = FirstCommandId; + volatile Snapshot snapshot_now = NULL; + volatile bool txn_started = false; + volatile bool subtxn_started = false; txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); @@ -1309,8 +1309,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, /* * Decoding needs access to syscaches et al., which in turn use - * heavyweight locks and such. Thus we need to have enough state around - * to keep track of those. The easiest way is to simply use a + * heavyweight locks and such. Thus we need to have enough state + * around to keep track of those. The easiest way is to simply use a * transaction internally. That also allows us to easily enforce that * nothing writes to the database by checking for xid assignments. * @@ -1344,7 +1344,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, Assert(snapshot_now); reloid = RelidByRelfilenode(change->data.tp.relnode.spcNode, - change->data.tp.relnode.relNode); + change->data.tp.relnode.relNode); /* * Catalog tuple without data, emitted while catalog was @@ -1415,6 +1415,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferCopySnap(rb, change->data.snapshot, txn, command_id); } + /* * Restored from disk, need to be careful not to double * free. We could introduce refcounting for that, but for @@ -1447,7 +1448,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, { /* we don't use the global one anymore */ snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, - txn, command_id); + txn, command_id); } snapshot_now->curcid = command_id; @@ -1586,7 +1587,7 @@ ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) */ dlist_foreach_modify(it, &rb->toplevel_by_lsn) { - ReorderBufferTXN * txn; + ReorderBufferTXN *txn; txn = dlist_container(ReorderBufferTXN, node, it.cur); @@ -1998,7 +1999,8 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: { char *data; - ReorderBufferTupleBuf *oldtup, *newtup; + ReorderBufferTupleBuf *oldtup, + *newtup; Size oldlen = 0; Size newlen = 0; @@ -2007,12 +2009,12 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, if (oldtup) oldlen = offsetof(ReorderBufferTupleBuf, data) - + oldtup->tuple.t_len + +oldtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); if (newtup) newlen = offsetof(ReorderBufferTupleBuf, data) - + newtup->tuple.t_len + +newtup->tuple.t_len - offsetof(HeapTupleHeaderData, t_bits); sz += oldlen; @@ -2188,7 +2190,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, else if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); + errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), @@ -2199,7 +2201,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ondisk = (ReorderBufferDiskChange *) rb->outbuf; ReorderBufferSerializeReserve(rb, - sizeof(ReorderBufferDiskChange) + ondisk->size); + sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), @@ -2208,13 +2210,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, if (readBytes < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); + errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", readBytes, - (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); + (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); /* * ok, read a full change from disk, now restore it into proper @@ -2364,7 +2366,7 @@ StartupReorderBuffer(void) logical_dir = AllocateDir("pg_replslot"); while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL) { - struct stat statbuf; + struct stat statbuf; char path[MAXPGPATH]; if (strcmp(logical_de->d_name, ".") == 0 || @@ -2620,7 +2622,7 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, cchange = dlist_container(ReorderBufferChange, node, it.cur); ctup = cchange->data.tp.newtuple; chunk = DatumGetPointer( - fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); + fastgetattr(&ctup->tuple, 3, toast_desc, &isnull)); Assert(!isnull); Assert(!VARATT_IS_EXTERNAL(chunk)); @@ -2800,7 +2802,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) ReorderBufferTupleCidKey key; ReorderBufferTupleCidEnt *ent; ReorderBufferTupleCidEnt *new_ent; - bool found; + bool found; /* be careful about padding */ memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); @@ -2813,7 +2815,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); - else if (readBytes == 0) /* EOF */ + else if (readBytes == 0) /* EOF */ break; else if (readBytes != sizeof(LogicalRewriteMappingData)) ereport(ERROR, @@ -2884,8 +2886,8 @@ TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) static int file_sort_by_lsn(const void *a_p, const void *b_p) { - RewriteMappingFile *a = *(RewriteMappingFile **)a_p; - RewriteMappingFile *b = *(RewriteMappingFile **)b_p; + RewriteMappingFile *a = *(RewriteMappingFile **) a_p; + RewriteMappingFile *b = *(RewriteMappingFile **) b_p; if (a->lsn < b->lsn) return -1; @@ -2912,19 +2914,20 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) mapping_dir = AllocateDir("pg_llog/mappings"); while ((mapping_de = ReadDir(mapping_dir, "pg_llog/mappings")) != NULL) { - Oid f_dboid; - Oid f_relid; - TransactionId f_mapped_xid; - TransactionId f_create_xid; - XLogRecPtr f_lsn; - uint32 f_hi, f_lo; + Oid f_dboid; + Oid f_relid; + TransactionId f_mapped_xid; + TransactionId f_create_xid; + XLogRecPtr f_lsn; + uint32 f_hi, + f_lo; RewriteMappingFile *f; if (strcmp(mapping_de->d_name, ".") == 0 || strcmp(mapping_de->d_name, "..") == 0) continue; - /* Ignore files that aren't ours*/ + /* Ignore files that aren't ours */ if (strncmp(mapping_de->d_name, "map-", 4) != 0) continue; @@ -2971,11 +2974,12 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) qsort(files_a, list_length(files), sizeof(RewriteMappingFile *), file_sort_by_lsn); - for(off = 0; off < list_length(files); off++) + for (off = 0; off < list_length(files); off++) { RewriteMappingFile *f = files_a[off]; + elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname, - snapshot->subxip[0]); + snapshot->subxip[0]); ApplyLogicalMappingFile(tuplecid_data, relid, f->fname); pfree(f); } @@ -2995,7 +2999,7 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, ReorderBufferTupleCidEnt *ent; ForkNumber forkno; BlockNumber blockno; - bool updated_mapping = false; + bool updated_mapping = false; /* be careful about padding */ memset(&key, 0, sizeof(key)); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 36034dbec9d..cb45f906fc1 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -57,27 +57,27 @@ * * The snapbuild machinery is starting up in several stages, as illustrated * by the following graph: - * +-------------------------+ - * +----|SNAPBUILD_START |-------------+ - * | +-------------------------+ | - * | | | - * | | | - * | running_xacts with running xacts | - * | | | - * | | | - * | v | - * | +-------------------------+ v - * | |SNAPBUILD_FULL_SNAPSHOT |------------>| - * | +-------------------------+ | - * running_xacts | saved snapshot - * with zero xacts | at running_xacts's lsn - * | | | - * | all running toplevel TXNs finished | - * | | | - * | v | - * | +-------------------------+ | - * +--->|SNAPBUILD_CONSISTENT |<------------+ - * +-------------------------+ + * +-------------------------+ + * +----|SNAPBUILD_START |-------------+ + * | +-------------------------+ | + * | | | + * | | | + * | running_xacts with running xacts | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | |SNAPBUILD_FULL_SNAPSHOT |------------>| + * | +-------------------------+ | + * running_xacts | saved snapshot + * with zero xacts | at running_xacts's lsn + * | | | + * | all running toplevel TXNs finished | + * | | | + * | v | + * | +-------------------------+ | + * +--->|SNAPBUILD_CONSISTENT |<------------+ + * +-------------------------+ * * Initially the machinery is in the START stage. When a xl_running_xacts * record is read that is sufficiently new (above the safe xmin horizon), @@ -184,7 +184,7 @@ struct SnapBuild * Information about initially running transactions * * When we start building a snapshot there already may be transactions in - * progress. Those are stored in running.xip. We don't have enough + * progress. Those are stored in running.xip. We don't have enough * information about those to decode their contents, so until they are * finished (xcnt=0) we cannot switch to a CONSISTENT state. */ @@ -244,7 +244,7 @@ struct SnapBuild * removes knowledge about the previously used resowner, so we save it here. */ ResourceOwner SavedResourceOwnerDuringExport = NULL; -bool ExportInProgress = false; +bool ExportInProgress = false; /* transaction state manipulation functions */ static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); @@ -496,7 +496,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) snapshot->copied = false; snapshot->curcid = FirstCommandId; snapshot->active_count = 0; - snapshot->regd_count = 1; /* mark as registered so nobody frees it */ + snapshot->regd_count = 1; /* mark as registered so nobody frees it */ return snapshot; } @@ -635,7 +635,7 @@ SnapBuildClearExportedSnapshot() bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { - bool is_old_tx; + bool is_old_tx; /* * We can't handle data in transactions if we haven't built a snapshot @@ -692,10 +692,10 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, CommandId cid; /* - * we only log new_cid's if a catalog tuple was modified, so mark - * the transaction as containing catalog modifications + * we only log new_cid's if a catalog tuple was modified, so mark the + * transaction as containing catalog modifications */ - ReorderBufferXidSetCatalogChanges(builder->reorder, xid,lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, xlrec->target.node, xlrec->target.tid, @@ -712,7 +712,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, cid = xlrec->cmin; else { - cid = InvalidCommandId; /* silence compiler */ + cid = InvalidCommandId; /* silence compiler */ elog(ERROR, "xl_heap_new_cid record without a valid CommandId"); } @@ -818,7 +818,7 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) (uint32) builder->committed.xcnt_space); builder->committed.xip = repalloc(builder->committed.xip, - builder->committed.xcnt_space * sizeof(TransactionId)); + builder->committed.xcnt_space * sizeof(TransactionId)); } /* @@ -900,10 +900,10 @@ SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) * so our incrementaly built snapshot now is consistent. */ ereport(LOG, - (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("xid %u finished, no running transactions anymore", - xid))); + (errmsg("logical decoding found consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("xid %u finished, no running transactions anymore", + xid))); builder->state = SNAPBUILD_CONSISTENT; } } @@ -1170,15 +1170,16 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); + /* * No in-progress transaction, can reuse the last serialized snapshot if * we have one. */ else if (txn == NULL && - builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && + builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && builder->last_serialized_snapshot != InvalidXLogRecPtr) LogicalIncreaseRestartDecodingForSlot(lsn, - builder->last_serialized_snapshot); + builder->last_serialized_snapshot); } @@ -1199,23 +1200,23 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * the currently running transactions. There are several ways to do that: * * a) There were no running transactions when the xl_running_xacts record - * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) and c). + * was inserted, jump to CONSISTENT immediately. We might find such a + * state we were waiting for b) and c). * * b) Wait for all toplevel transactions that were running to end. We - * simply track the number of in-progress toplevel transactions and - * lower it whenever one commits or aborts. When that number - * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT - * to CONSISTENT. + * simply track the number of in-progress toplevel transactions and + * lower it whenever one commits or aborts. When that number + * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT + * to CONSISTENT. * NB: We need to search running.xip when seeing a transaction's end to - * make sure it's a toplevel transaction and it's been one of the - * intially running ones. + * make sure it's a toplevel transaction and it's been one of the + * intially running ones. * Interestingly, in contrast to HS, this allows us not to care about * subtransactions - and by extension suboverflowed xl_running_xacts - * at all. * * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. + * snapshot to disk that we can use. * --- */ @@ -1231,7 +1232,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn (errmsg("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low", (uint32) (lsn >> 32), (uint32) lsn), errdetail("initial xmin horizon of %u vs the snapshot's %u", - builder->initial_xmin_horizon, running->oldestRunningXid))); + builder->initial_xmin_horizon, running->oldestRunningXid))); return true; } @@ -1263,7 +1264,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), + (uint32) (lsn >> 32), (uint32) lsn), errdetail("running xacts with xcnt == 0"))); return false; @@ -1274,15 +1275,16 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn /* there won't be any state to cleanup */ return false; } + /* * b) first encounter of a useable xl_running_xacts record. If we had - * found one earlier we would either track running transactions - * (i.e. builder->running.xcnt != 0) or be consistent (this function - * wouldn't get called). + * found one earlier we would either track running transactions (i.e. + * builder->running.xcnt != 0) or be consistent (this function wouldn't + * get called). */ else if (!builder->running.xcnt) { - int off; + int off; /* * We only care about toplevel xids as those are the ones we @@ -1302,7 +1304,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->running.xcnt_space = running->xcnt; builder->running.xip = MemoryContextAlloc(builder->context, - builder->running.xcnt * sizeof(TransactionId)); + builder->running.xcnt * sizeof(TransactionId)); memcpy(builder->running.xip, running->xids, builder->running.xcnt * sizeof(TransactionId)); @@ -1320,9 +1322,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->state = SNAPBUILD_FULL_SNAPSHOT; ereport(LOG, - (errmsg("logical decoding found initial starting point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), - errdetail("%u xacts need to finish", (uint32) builder->running.xcnt))); + (errmsg("logical decoding found initial starting point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("%u xacts need to finish", (uint32) builder->running.xcnt))); /* * Iterate through all xids, wait for them to finish. @@ -1331,7 +1333,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * isolationtester to notice that we're currently waiting for * something. */ - for(off = 0; off < builder->running.xcnt; off++) + for (off = 0; off < builder->running.xcnt; off++) { TransactionId xid = builder->running.xip[off]; @@ -1471,9 +1473,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) * but remember location, so we don't need to read old data again. * * To be sure it has been synced to disk after the rename() from the - * tempfile filename to the real filename, we just repeat the - * fsync. That ought to be cheap because in most scenarios it should - * already be safely on disk. + * tempfile filename to the real filename, we just repeat the fsync. + * That ought to be cheap because in most scenarios it should already + * be safely on disk. */ fsync_fname(path, false); fsync_fname("pg_llog/snapshots", true); @@ -1504,7 +1506,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) if (unlink(tmppath) != 0 && errno != ENOENT) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not unlink file \"%s\": %m", path))); + errmsg("could not unlink file \"%s\": %m", path))); needed_length = sizeof(SnapBuildOnDisk) + sizeof(TransactionId) * builder->running.xcnt_space + @@ -1518,7 +1520,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) INIT_CRC32(ondisk->checksum); COMP_CRC32(ondisk->checksum, ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, - SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); ondisk_c += sizeof(SnapBuildOnDisk); memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); @@ -1597,8 +1599,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) fsync_fname("pg_llog/snapshots", true); /* - * Now there's no way we can loose the dumped state anymore, remember - * this as a serialization point. + * Now there's no way we can loose the dumped state anymore, remember this + * as a serialization point. */ builder->last_serialized_snapshot = lsn; @@ -1673,7 +1675,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) INIT_CRC32(checksum); COMP_CRC32(checksum, ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize, - SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); + SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); @@ -1781,7 +1783,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) ereport(LOG, (errmsg("logical decoding found consistent point at %X/%X", - (uint32)(lsn >> 32), (uint32)lsn), + (uint32) (lsn >> 32), (uint32) lsn), errdetail("found initial snapshot in snapbuild file"))); return true; @@ -1829,7 +1831,7 @@ CheckPointSnapBuild(void) uint32 hi; uint32 lo; XLogRecPtr lsn; - struct stat statbuf; + struct stat statbuf; if (strcmp(snap_de->d_name, ".") == 0 || strcmp(snap_de->d_name, "..") == 0) @@ -1846,8 +1848,8 @@ CheckPointSnapBuild(void) /* * temporary filenames from SnapBuildSerialize() include the LSN and * everything but are postfixed by .$pid.tmp. We can just remove them - * the same as other files because there can be none that are currently - * being written that are older than cutoff. + * the same as other files because there can be none that are + * currently being written that are older than cutoff. * * We just log a message if a file doesn't fit the pattern, it's * probably some editors lock/state file or similar... diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 76e55736605..ee0c7c07a97 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -72,7 +72,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 1 /* version for new files */ +#define SLOT_VERSION 1 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -81,7 +81,8 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; ReplicationSlot *MyReplicationSlot = NULL; /* GUCs */ -int max_replication_slots = 0; /* the maximum number of replication slots */ +int max_replication_slots = 0; /* the maximum number of replication + * slots */ static void ReplicationSlotDropAcquired(void); @@ -180,8 +181,8 @@ ReplicationSlotValidateName(const char *name, int elevel) { ereport(elevel, (errcode(ERRCODE_INVALID_NAME), - errmsg("replication slot name \"%s\" contains invalid character", - name), + errmsg("replication slot name \"%s\" contains invalid character", + name), errhint("Replication slot names may only contain letters, numbers and the underscore character."))); return false; } @@ -194,7 +195,7 @@ ReplicationSlotValidateName(const char *name, int elevel) * * name: Name of the slot * db_specific: logical decoding is db specific; if the slot is going to - * be used for that pass true, otherwise false. + * be used for that pass true, otherwise false. */ void ReplicationSlotCreate(const char *name, bool db_specific, @@ -208,18 +209,18 @@ ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotValidateName(name, ERROR); /* - * If some other backend ran this code currently with us, we'd likely - * both allocate the same slot, and that would be bad. We'd also be - * at risk of missing a name collision. Also, we don't want to try to - * create a new slot while somebody's busy cleaning up an old one, because - * we might both be monkeying with the same directory. + * If some other backend ran this code currently with us, we'd likely both + * allocate the same slot, and that would be bad. We'd also be at risk of + * missing a name collision. Also, we don't want to try to create a new + * slot while somebody's busy cleaning up an old one, because we might + * both be monkeying with the same directory. */ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); /* - * Check for name collision, and identify an allocatable slot. We need - * to hold ReplicationSlotControlLock in shared mode for this, so that - * nobody else can change the in_use flags while we're looking at them. + * Check for name collision, and identify an allocatable slot. We need to + * hold ReplicationSlotControlLock in shared mode for this, so that nobody + * else can change the in_use flags while we're looking at them. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) @@ -243,10 +244,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, errhint("Free one or increase max_replication_slots."))); /* - * Since this slot is not in use, nobody should be looking at any - * part of it other than the in_use field unless they're trying to allocate - * it. And since we hold ReplicationSlotAllocationLock, nobody except us - * can be doing that. So it's safe to initialize the slot. + * Since this slot is not in use, nobody should be looking at any part of + * it other than the in_use field unless they're trying to allocate it. + * And since we hold ReplicationSlotAllocationLock, nobody except us can + * be doing that. So it's safe to initialize the slot. */ Assert(!slot->in_use); Assert(!slot->active); @@ -366,6 +367,7 @@ ReplicationSlotRelease(void) { /* Mark slot inactive. We're not freeing it, just disconnecting. */ volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); vslot->active = false; SpinLockRelease(&slot->mutex); @@ -444,7 +446,7 @@ ReplicationSlotDropAcquired(void) else { volatile ReplicationSlot *vslot = slot; - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = slot->data.persistency == RS_EPHEMERAL; SpinLockAcquire(&slot->mutex); vslot->active = false; @@ -571,8 +573,8 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - TransactionId effective_xmin; - TransactionId effective_catalog_xmin; + TransactionId effective_xmin; + TransactionId effective_catalog_xmin; if (!s->in_use) continue; @@ -612,7 +614,7 @@ void ReplicationSlotsComputeRequiredLSN(void) { int i; - XLogRecPtr min_required = InvalidXLogRecPtr; + XLogRecPtr min_required = InvalidXLogRecPtr; Assert(ReplicationSlotCtl != NULL); @@ -620,7 +622,7 @@ ReplicationSlotsComputeRequiredLSN(void) for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn; if (!s->in_use) continue; @@ -669,7 +671,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void) for (i = 0; i < max_replication_slots; i++) { volatile ReplicationSlot *s; - XLogRecPtr restart_lsn; + XLogRecPtr restart_lsn; s = &ReplicationSlotCtl->replication_slots[i]; @@ -772,8 +774,8 @@ CheckSlotRequirements(void) static bool string_endswith(const char *str, const char *end) { - size_t slen = strlen(str); - size_t elen = strlen(end); + size_t slen = strlen(str); + size_t elen = strlen(end); /* can't be a postfix if longer */ if (elen > slen) @@ -802,8 +804,8 @@ CheckPointReplicationSlots(void) * Prevent any slot from being created/dropped while we're active. As we * explicitly do *not* want to block iterating over replication_slots or * acquiring a slot we cannot take the control lock - but that's OK, - * because holding ReplicationSlotAllocationLock is strictly stronger, - * and enough to guarantee that nobody can change the in_use bits on us. + * because holding ReplicationSlotAllocationLock is strictly stronger, and + * enough to guarantee that nobody can change the in_use bits on us. */ LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); @@ -839,7 +841,7 @@ StartupReplicationSlots(XLogRecPtr checkPointRedo) replication_dir = AllocateDir("pg_replslot"); while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL) { - struct stat statbuf; + struct stat statbuf; char path[MAXPGPATH]; if (strcmp(replication_de->d_name, ".") == 0 || @@ -892,7 +894,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; - struct stat st; + struct stat st; /* * No need to take out the io_in_progress_lock, nobody else can see this @@ -904,11 +906,10 @@ CreateSlotOnDisk(ReplicationSlot *slot) sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); /* - * It's just barely possible that some previous effort to create or - * drop a slot with this name left a temp directory lying around. - * If that seems to be the case, try to remove it. If the rmtree() - * fails, we'll error out at the mkdir() below, so we don't bother - * checking success. + * It's just barely possible that some previous effort to create or drop a + * slot with this name left a temp directory lying around. If that seems + * to be the case, try to remove it. If the rmtree() fails, we'll error + * out at the mkdir() below, so we don't bother checking success. */ if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode)) rmtree(tmppath, true); @@ -922,7 +923,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) fsync_fname(tmppath, true); /* Write the actual state file. */ - slot->dirty = true; /* signal that we really need to write */ + slot->dirty = true; /* signal that we really need to write */ SaveSlotToPath(slot, tmppath, ERROR); /* Rename the directory into place. */ @@ -1003,12 +1004,13 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockRelease(&slot->mutex); COMP_CRC32(cp.checksum, - (char *)(&cp) + ReplicationSlotOnDiskConstantSize, + (char *) (&cp) + ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskDynamicSize); if ((write(fd, &cp, sizeof(cp))) != sizeof(cp)) { - int save_errno = errno; + int save_errno = errno; + CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1021,7 +1023,8 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) /* fsync the temporary file */ if (pg_fsync(fd) != 0) { - int save_errno = errno; + int save_errno = errno; + CloseTransientFile(fd); errno = save_errno; ereport(elevel, @@ -1150,19 +1153,19 @@ RestoreSlotFromDisk(const char *name) if (cp.version != SLOT_VERSION) ereport(PANIC, (errcode_for_file_access(), - errmsg("replication slot file \"%s\" has unsupported version %u", - path, cp.version))); + errmsg("replication slot file \"%s\" has unsupported version %u", + path, cp.version))); /* boundary check on length */ if (cp.length != ReplicationSlotOnDiskDynamicSize) ereport(PANIC, (errcode_for_file_access(), - errmsg("replication slot file \"%s\" has corrupted length %u", - path, cp.length))); + errmsg("replication slot file \"%s\" has corrupted length %u", + path, cp.length))); /* Now that we know the size, read the entire file */ readBytes = read(fd, - (char *)&cp + ReplicationSlotOnDiskConstantSize, + (char *) &cp + ReplicationSlotOnDiskConstantSize, cp.length); if (readBytes != cp.length) { @@ -1181,7 +1184,7 @@ RestoreSlotFromDisk(const char *name) /* now verify the CRC32 */ INIT_CRC32(checksum); COMP_CRC32(checksum, - (char *)&cp + ReplicationSlotOnDiskConstantSize, + (char *) &cp + ReplicationSlotOnDiskConstantSize, ReplicationSlotOnDiskDynamicSize); if (!EQ_CRC32(checksum, cp.checksum)) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index c9416b03eee..dc94f504ee2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -53,7 +53,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - /* acquire replication slot, this will check for conflicting names*/ + /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(NameStr(*name), false, RS_PERSISTENT); values[0] = NameGetDatum(&MyReplicationSlot->data.name); @@ -97,8 +97,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Assert(!MyReplicationSlot); /* - * Acquire a logical decoding slot, this will check for conflicting - * names. + * Acquire a logical decoding slot, this will check for conflicting names. */ ReplicationSlotCreate(NameStr(*name), true, RS_EPHEMERAL); @@ -106,8 +105,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) * Create logical decoding context, to build the initial snapshot. */ ctx = CreateInitDecodingContext( - NameStr(*plugin), NIL, - logical_read_local_xlog_page, NULL, NULL); + NameStr(*plugin), NIL, + logical_read_local_xlog_page, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index f65021caa68..aa54bfba6cf 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -117,8 +117,8 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) * set. See SyncRepUpdateSyncStandbysDefined. * * Also check that the standby hasn't already replied. Unlikely race - * condition but we'll be fetching that cache line anyway so it's likely to - * be a low cost check. + * condition but we'll be fetching that cache line anyway so it's likely + * to be a low cost check. */ if (!WalSndCtl->sync_standbys_defined || XactCommitLSN <= WalSndCtl->lsn[mode]) @@ -517,7 +517,7 @@ SyncRepGetStandbyPriority(void) } /* - * Walk the specified queue from head. Set the state of any backends that + * Walk the specified queue from head. Set the state of any backends that * need to be woken, remove them from the queue, and then wake them. * Pass all = true to wake whole queue; otherwise, just wake up to * the walsender's LSN. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index b0de0ea253e..c2d4ed3a968 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -258,7 +258,7 @@ WalReceiverMain(void) /* * If possible, make this process a group leader, so that the postmaster - * can signal any child processes too. (walreceiver probably never has + * can signal any child processes too. (walreceiver probably never has * any child processes, but for consistency we make all postmaster child * processes do this.) */ @@ -786,7 +786,7 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) on_exit_reset(); /* - * Note we do exit(2) not exit(0). This is to force the postmaster into a + * Note we do exit(2) not exit(0). This is to force the postmaster into a * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random * backend. This is necessary precisely because we don't clean up our * shared memory state. (The "dead man switch" mechanism in pmsignal.c @@ -934,9 +934,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) ereport(PANIC, (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(recvFileTLI, recvSegNo), - startoff))); + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(recvFileTLI, recvSegNo), + startoff))); recvOff = startoff; } diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index acadec57f5a..579216af34d 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -291,7 +291,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * Returns the last+1 byte position that walreceiver has written. * * Optionally, returns the previous chunk start, that is the first byte - * written in the most recent walreceiver flush cycle. Callers not + * written in the most recent walreceiver flush cycle. Callers not * interested in that value may pass NULL for latestChunkStart. Same for * receiveTLI. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 6e22c03bcfa..5c11d681c33 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -82,7 +82,7 @@ #include "utils/timestamp.h" /* - * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. + * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. * * We don't have a good idea of what a good value would be; there's some * overhead per message in both walsender and walreceiver, but on the other @@ -165,7 +165,7 @@ static bool streamingDoneSending; static bool streamingDoneReceiving; /* Are we there yet? */ -static bool WalSndCaughtUp = false; +static bool WalSndCaughtUp = false; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; @@ -180,7 +180,7 @@ static volatile sig_atomic_t walsender_ready_to_stop = false; static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; -static XLogRecPtr logical_startptr = InvalidXLogRecPtr; +static XLogRecPtr logical_startptr = InvalidXLogRecPtr; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); @@ -188,7 +188,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -typedef void (*WalSndSendDataCallback)(void); +typedef void (*WalSndSendDataCallback) (void); static void WalSndLoop(WalSndSendDataCallback send_data); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); @@ -301,8 +301,8 @@ IdentifySystem(void) /* * Reply with a result set with one row, four columns. First col is system - * ID, second is timeline ID, third is current xlog location and the fourth - * contains the database name if we are connected to one. + * ID, second is timeline ID, third is current xlog location and the + * fourth contains the database name if we are connected to one. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, @@ -358,22 +358,22 @@ IdentifySystem(void) pq_sendint(&buf, 0, 2); /* format code */ /* third field */ - pq_sendstring(&buf, "xlogpos"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ + pq_sendstring(&buf, "xlogpos"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ /* fourth field */ - pq_sendstring(&buf, "dbname"); /* col name */ - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ - pq_sendint(&buf, TEXTOID, 4); /* type oid */ - pq_sendint(&buf, -1, 2); /* typlen */ - pq_sendint(&buf, 0, 4); /* typmod */ - pq_sendint(&buf, 0, 2); /* format code */ + pq_sendstring(&buf, "dbname"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ pq_endmessage(&buf); /* Send a DataRow message */ @@ -388,12 +388,12 @@ IdentifySystem(void) /* send NULL if not connected to a database */ if (dbname) { - pq_sendint(&buf, strlen(dbname), 4); /* col4 len */ + pq_sendint(&buf, strlen(dbname), 4); /* col4 len */ pq_sendbytes(&buf, (char *) dbname, strlen(dbname)); } else { - pq_sendint(&buf, -1, 4); /* col4 len, NULL */ + pq_sendint(&buf, -1, 4); /* col4 len, NULL */ } pq_endmessage(&buf); @@ -731,11 +731,11 @@ StartReplication(StartReplicationCmd *cmd) * set everytime WAL is flushed. */ static int -logical_read_xlog_page(XLogReaderState* state, XLogRecPtr targetPagePtr, int reqLen, - XLogRecPtr targetRecPtr, char* cur_page, TimeLineID *pageTLI) +logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI) { - XLogRecPtr flushptr; - int count; + XLogRecPtr flushptr; + int count; /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -764,7 +764,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { const char *slot_name; const char *snapshot_name = NULL; - char xpos[MAXFNAMELEN]; + char xpos[MAXFNAMELEN]; StringInfoData buf; Assert(!MyReplicationSlot); @@ -792,9 +792,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) LogicalDecodingContext *ctx; ctx = CreateInitDecodingContext( - cmd->plugin, NIL, - logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + cmd->plugin, NIL, + logical_read_xlog_page, + WalSndPrepareWrite, WalSndWriteData); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); @@ -838,7 +838,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_sendint(&buf, 0, 2); /* format code */ /* third field: exported snapshot's name */ - pq_sendstring(&buf, "snapshot_name"); /* col name */ + pq_sendstring(&buf, "snapshot_name"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, TEXTOID, 4); /* type oid */ @@ -847,7 +847,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_sendint(&buf, 0, 2); /* format code */ /* fourth field: output plugin */ - pq_sendstring(&buf, "output_plugin"); /* col name */ + pq_sendstring(&buf, "output_plugin"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, TEXTOID, 4); /* type oid */ @@ -862,26 +862,26 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) pq_sendint(&buf, 4, 2); /* # of columns */ /* slot_name */ - pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */ + pq_sendint(&buf, strlen(slot_name), 4); /* col1 len */ pq_sendbytes(&buf, slot_name, strlen(slot_name)); /* consistent wal location */ - pq_sendint(&buf, strlen(xpos), 4); /* col2 len */ + pq_sendint(&buf, strlen(xpos), 4); /* col2 len */ pq_sendbytes(&buf, xpos, strlen(xpos)); /* snapshot name */ if (snapshot_name != NULL) { - pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */ + pq_sendint(&buf, strlen(snapshot_name), 4); /* col3 len */ pq_sendbytes(&buf, snapshot_name, strlen(snapshot_name)); } else - pq_sendint(&buf, -1, 4); /* col3 len, NULL */ + pq_sendint(&buf, -1, 4); /* col3 len, NULL */ /* plugin */ if (cmd->plugin != NULL) { - pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */ + pq_sendint(&buf, strlen(cmd->plugin), 4); /* col4 len */ pq_sendbytes(&buf, cmd->plugin, strlen(cmd->plugin)); } else @@ -951,9 +951,9 @@ StartLogicalReplication(StartReplicationCmd *cmd) * to be shipped from that position. */ logical_decoding_ctx = CreateDecodingContext( - cmd->startpoint, cmd->options, - logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + cmd->startpoint, cmd->options, + logical_read_xlog_page, + WalSndPrepareWrite, WalSndWriteData); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1013,11 +1013,12 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi pq_sendbyte(ctx->out, 'w'); pq_sendint64(ctx->out, lsn); /* dataStart */ pq_sendint64(ctx->out, lsn); /* walEnd */ + /* * Fill out the sendtime later, just as it's done in XLogSendPhysical, but * reserve space here. */ - pq_sendint64(ctx->out, 0); /* sendtime */ + pq_sendint64(ctx->out, 0); /* sendtime */ } /* @@ -1035,9 +1036,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); /* - * Fill the send timestamp last, so that it is taken as late as - * possible. This is somewhat ugly, but the protocol's set as it's already - * used for several releases by streaming physical replication. + * Fill the send timestamp last, so that it is taken as late as possible. + * This is somewhat ugly, but the protocol's set as it's already used for + * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp()); @@ -1056,7 +1057,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, { int wakeEvents; long sleeptime; - TimestampTz now; + TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1140,7 +1141,7 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { long sleeptime; - TimestampTz now; + TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1297,6 +1298,7 @@ exec_replication_command(const char *cmd_string) case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else @@ -1472,7 +1474,8 @@ ProcessStandbyMessage(void) static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { - bool changed = false; + bool changed = false; + /* use volatile pointer to prevent code rearrangement */ volatile ReplicationSlot *slot = MyReplicationSlot; @@ -1492,9 +1495,9 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) } /* - * One could argue that the slot should be saved to disk now, but that'd be - * energy wasted - the worst lost information can do here is give us wrong - * information in a statistics view - we'll just potentially be more + * One could argue that the slot should be saved to disk now, but that'd + * be energy wasted - the worst lost information can do here is give us + * wrong information in a statistics view - we'll just potentially be more * conservative in removing files. */ } @@ -1561,15 +1564,16 @@ ProcessStandbyReplyMessage(void) static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin) { - bool changed = false; + bool changed = false; volatile ReplicationSlot *slot = MyReplicationSlot; SpinLockAcquire(&slot->mutex); MyPgXact->xmin = InvalidTransactionId; + /* - * For physical replication we don't need the interlock provided - * by xmin and effective_xmin since the consequences of a missed increase - * are limited to query cancellations, so set both at once. + * For physical replication we don't need the interlock provided by xmin + * and effective_xmin since the consequences of a missed increase are + * limited to query cancellations, so set both at once. */ if (!TransactionIdIsNormal(slot->data.xmin) || !TransactionIdIsNormal(feedbackXmin) || @@ -1655,7 +1659,7 @@ ProcessStandbyHSFeedbackMessage(void) * perhaps far enough to make feedbackXmin wrap around. In that case the * xmin we set here would be "in the future" and have no effect. No point * in worrying about this since it's too late to save the desired data - * anyway. Assuming that the standby sends us an increasing sequence of + * anyway. Assuming that the standby sends us an increasing sequence of * xmins, this could only happen during the first reply cycle, else our * own xmin would prevent nextXid from advancing so far. * @@ -1667,11 +1671,11 @@ ProcessStandbyHSFeedbackMessage(void) * * If we're using a replication slot we reserve the xmin via that, * otherwise via the walsender's PGXACT entry. - + * * XXX: It might make sense to introduce ephemeral slots and always use * the slot mechanism. */ - if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ + if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ PhysicalReplicationSlotNewXmin(feedbackXmin); else MyPgXact->xmin = feedbackXmin; @@ -1692,8 +1696,8 @@ WalSndComputeSleeptime(TimestampTz now) if (wal_sender_timeout > 0) { TimestampTz wakeup_time; - long sec_to_timeout; - int microsec_to_timeout; + long sec_to_timeout; + int microsec_to_timeout; /* * At the latest stop sleeping once wal_sender_timeout has been @@ -1703,13 +1707,13 @@ WalSndComputeSleeptime(TimestampTz now) wal_sender_timeout); /* - * If no ping has been sent yet, wakeup when it's time to do - * so. WalSndKeepaliveIfNecessary() wants to send a keepalive once - * half of the timeout passed without a response. + * If no ping has been sent yet, wakeup when it's time to do so. + * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of + * the timeout passed without a response. */ if (!waiting_for_ping_response) wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2); + wal_sender_timeout / 2); /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, @@ -1738,11 +1742,11 @@ WalSndCheckTimeOut(TimestampTz now) { /* * Since typically expiration of replication timeout means - * communication problem, we don't send the error message to - * the standby. + * communication problem, we don't send the error message to the + * standby. */ ereport(COMMERROR, - (errmsg("terminating walsender process due to replication timeout"))); + (errmsg("terminating walsender process due to replication timeout"))); WalSndShutdown(); } @@ -1770,7 +1774,7 @@ WalSndLoop(WalSndSendDataCallback send_data) */ for (;;) { - TimestampTz now; + TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1839,10 +1843,10 @@ WalSndLoop(WalSndSendDataCallback send_data) /* * When SIGUSR2 arrives, we send any outstanding logs up to the - * shutdown checkpoint record (i.e., the latest record), wait - * for them to be replicated to the standby, and exit. - * This may be a normal termination at shutdown, or a promotion, - * the walsender is not sure which. + * shutdown checkpoint record (i.e., the latest record), wait for + * them to be replicated to the standby, and exit. This may be a + * normal termination at shutdown, or a promotion, the walsender + * is not sure which. */ if (walsender_ready_to_stop) WalSndDone(send_data); @@ -2246,7 +2250,7 @@ XLogSendPhysical(void) * * Attempt to send all data that's already been written out and * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of XLogRead(). And in any case + * given the current implementation of XLogRead(). And in any case * it's unsafe to send WAL that is not securely down to disk on the * master: if the master subsequently crashes and restarts, slaves * must not have applied any WAL that gets lost on the master. @@ -2416,8 +2420,8 @@ XLogSendLogical(void) else { /* - * If the record we just wanted read is at or beyond the flushed point, - * then we're caught up. + * If the record we just wanted read is at or beyond the flushed + * point, then we're caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) WalSndCaughtUp = true; @@ -2452,10 +2456,10 @@ WalSndDone(WalSndSendDataCallback send_data) send_data(); /* - * Check a write location to see whether all the WAL have - * successfully been replicated if this walsender is connecting - * to a standby such as pg_receivexlog which always returns - * an invalid flush location. Otherwise, check a flush location. + * Check a write location to see whether all the WAL have successfully + * been replicated if this walsender is connecting to a standby such as + * pg_receivexlog which always returns an invalid flush location. + * Otherwise, check a flush location. */ replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; @@ -2562,8 +2566,8 @@ WalSndLastCycleHandler(SIGNAL_ARGS) /* * If replication has not yet started, die like with SIGTERM. If * replication is active, only set a flag and wake up the main loop. It - * will send any outstanding WAL, wait for it to be replicated to - * the standby, and then exit gracefully. + * will send any outstanding WAL, wait for it to be replicated to the + * standby, and then exit gracefully. */ if (!replication_active) kill(MyProcPid, SIGTERM); |
