XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
- int fd;
+ File fd;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
* subtransactions
* ---------------------------------------
*/
-static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state);
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno);
+ File *fd, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
/*
* Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions.
+ *
+ * Note: The iterator state is returned through iter_state parameter rather
+ * than the function's return value. This is because the state gets cleaned up
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
+ * back the state even if this function throws an exception.
*/
-static ReorderBufferIterTXNState *
-ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static void
+ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state)
{
Size nr_txns = 0;
ReorderBufferIterTXNState *state;
dlist_iter cur_txn_i;
int32 off;
+ *iter_state = NULL;
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
ReorderBufferIterCompare,
state);
+ /* Now that the state fields are initialized, it is safe to return it. */
+ *iter_state = state;
+
/*
* Now insert items into the binary heap, in an unordered fashion. (We
* will run a heap assembly step at the end; this is more efficient.)
/* assemble a valid binary heap */
binaryheap_build(state->heap);
-
- return state;
}
/*
for (off = 0; off < state->nr_txns; off++)
{
if (state->entries[off].fd != -1)
- CloseTransientFile(state->entries[off].fd);
+ FileClose(state->entries[off].fd);
}
/* free memory we might have "leaked" in the last *Next call */
rb->begin(rb, txn);
- iterstate = ReorderBufferIterTXNInit(rb, txn);
+ ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{
Relation relation = NULL;
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno)
+ File *fd, XLogSegNo *segno)
{
Size restored = 0;
XLogSegNo last_segno;
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
- *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
+ *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange),
+ WAIT_EVENT_REORDER_BUFFER_READ);
/* eof */
if (readBytes == 0)
{
- CloseTransientFile(*fd);
+ FileClose(*fd);
*fd = -1;
(*segno)++;
continue;
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
- ondisk->size - sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(*fd,
+ rb->outbuf + sizeof(ReorderBufferDiskChange),
+ ondisk->size - sizeof(ReorderBufferDiskChange),
+ WAIT_EVENT_REORDER_BUFFER_READ);
if (readBytes < 0)
ereport(ERROR,
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 11;
use Config;
# Initialize master node
is($node_master->slot('otherdb_slot')->{'slot_name'},
undef, 'logical slot was actually dropped with DB');
+# Test to ensure that we don't run out of file descriptors even if there
+# are more spill files than maxAllocatedDescs.
+
+# Set max_files_per_process to a small value to make it more likely to run out
+# of max open file descriptors.
+$node_master->safe_psql('postgres',
+ 'ALTER SYSTEM SET max_files_per_process = 26;');
+$node_master->restart;
+
+$node_master->safe_psql(
+ 'postgres', q{
+do $$
+BEGIN
+ FOR i IN 1..10 LOOP
+ BEGIN
+ INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
+ EXCEPTION
+ when division_by_zero then perform 'dummy';
+ END;
+ END LOOP;
+END $$;
+});
+
+$result = $node_master->safe_psql('postgres',
+ qq[
+SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
+ WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
+]);
+
+$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
+is($result, $expected, 'got expected output from spilling subxacts session');
+
+# Reset back max_files_per_process
+$node_master->safe_psql('postgres',
+ 'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
+$node_master->restart;
+
# done with the node
$node_master->stop;