diff options
Diffstat (limited to 'src/backend')
111 files changed, 1808 insertions, 1347 deletions
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 442a46140d..b0e89ace5e 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -190,7 +190,8 @@ brininsert(Relation idxRel, Datum *values, bool *nulls, AutoVacuumRequestWork(AVW_BRINSummarizeRange, RelationGetRelid(idxRel), lastPageRange); - brin_free_tuple(lastPageTuple); + else + LockBuffer(buf, BUFFER_LOCK_UNLOCK); } brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, diff --git a/src/backend/access/brin/brin_pageops.c b/src/backend/access/brin/brin_pageops.c index 1725591b05..3609c8ae7c 100644 --- a/src/backend/access/brin/brin_pageops.c +++ b/src/backend/access/brin/brin_pageops.c @@ -73,10 +73,8 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange, { ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("index row size %lu exceeds maximum %lu for index \"%s\"", - (unsigned long) newsz, - (unsigned long) BrinMaxItemSize, - RelationGetRelationName(idxrel)))); + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + newsz, BrinMaxItemSize, RelationGetRelationName(idxrel)))); return false; /* keep compiler quiet */ } @@ -357,10 +355,8 @@ brin_doinsert(Relation idxrel, BlockNumber pagesPerRange, { ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("index row size %lu exceeds maximum %lu for index \"%s\"", - (unsigned long) itemsz, - (unsigned long) BrinMaxItemSize, - RelationGetRelationName(idxrel)))); + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + itemsz, BrinMaxItemSize, RelationGetRelationName(idxrel)))); return InvalidOffsetNumber; /* keep compiler quiet */ } @@ -669,7 +665,7 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz, BlockNumber oldblk; BlockNumber newblk; Page page; - int freespace; + Size freespace; /* callers must have checked */ Assert(itemsz <= BrinMaxItemSize); @@ -825,10 +821,8 @@ brin_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz, ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("index row size %lu exceeds maximum %lu for index \"%s\"", - (unsigned long) itemsz, - (unsigned long) freespace, - RelationGetRelationName(irel)))); + errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", + itemsz, freespace, RelationGetRelationName(irel)))); return InvalidBuffer; /* keep compiler quiet */ } diff --git a/src/backend/access/brin/brin_revmap.c b/src/backend/access/brin/brin_revmap.c index fc8b10ab39..e778cbcacd 100644 --- a/src/backend/access/brin/brin_revmap.c +++ b/src/backend/access/brin/brin_revmap.c @@ -179,13 +179,16 @@ brinSetHeapBlockItemptr(Buffer buf, BlockNumber pagesPerRange, /* * Fetch the BrinTuple for a given heap block. * - * The buffer containing the tuple is locked, and returned in *buf. As an - * optimization, the caller can pass a pinned buffer *buf on entry, which will - * avoid a pin-unpin cycle when the next tuple is on the same page as a - * previous one. + * The buffer containing the tuple is locked, and returned in *buf. The + * returned tuple points to the shared buffer and must not be freed; if caller + * wants to use it after releasing the buffer lock, it must create its own + * palloc'ed copy. As an optimization, the caller can pass a pinned buffer + * *buf on entry, which will avoid a pin-unpin cycle when the next tuple is on + * the same page as a previous one. * * If no tuple is found for the given heap range, returns NULL. In that case, - * *buf might still be updated, but it's not locked. + * *buf might still be updated (and pin must be released by caller), but it's + * not locked. * * The output tuple offset within the buffer is returned in *off, and its size * is returned in *size. diff --git a/src/backend/access/brin/brin_validate.c b/src/backend/access/brin/brin_validate.c index dc23e00e89..b4acf2b6f3 100644 --- a/src/backend/access/brin/brin_validate.c +++ b/src/backend/access/brin/brin_validate.c @@ -113,8 +113,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "brin", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -129,8 +129,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "brin", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -151,8 +151,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "brin", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -180,8 +180,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" contains invalid ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "brin", format_operator(oprform->amopopr)))); result = false; } @@ -193,8 +193,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "brin", format_operator(oprform->amopopr)))); result = false; } @@ -231,8 +231,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" is missing operator(s) for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s", + opfamilyname, "brin", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -241,8 +241,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator family \"%s\" is missing support function(s) for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing support function(s) for types %s and %s", + opfamilyname, "brin", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -254,8 +254,8 @@ brinvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator class \"%s\" is missing operator(s)", - opclassname))); + errmsg("operator class \"%s\" of access method %s is missing operator(s)", + opclassname, "brin"))); result = false; } for (i = 1; i <= BRIN_MANDATORY_NPROCS; i++) @@ -265,8 +265,8 @@ brinvalidate(Oid opclassoid) continue; /* got it */ ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("brin operator class \"%s\" is missing support function %d", - opclassname, i))); + errmsg("operator class \"%s\" of access method %s is missing support function %d", + opclassname, "brin", i))); result = false; } diff --git a/src/backend/access/gin/ginvalidate.c b/src/backend/access/gin/ginvalidate.c index 0d2847456e..4c8e563545 100644 --- a/src/backend/access/gin/ginvalidate.c +++ b/src/backend/access/gin/ginvalidate.c @@ -90,8 +90,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains support procedure %s with cross-type registration", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains support procedure %s with different left and right input types", + opfamilyname, "gin", format_procedure(procform->amproc)))); result = false; } @@ -146,8 +146,8 @@ ginvalidate(Oid opclassoid) default: ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "gin", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -158,8 +158,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "gin", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -177,8 +177,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "gin", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -190,8 +190,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains invalid ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "gin", format_operator(oprform->amopopr)))); result = false; } @@ -203,8 +203,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "gin", format_operator(oprform->amopopr)))); result = false; } @@ -244,8 +244,8 @@ ginvalidate(Oid opclassoid) continue; /* don't need both, see check below loop */ ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator class \"%s\" is missing support function %d", - opclassname, i))); + errmsg("operator class \"%s\" of access method %s is missing support function %d", + opclassname, "gin", i))); result = false; } if (!opclassgroup || @@ -254,8 +254,8 @@ ginvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gin operator class \"%s\" is missing support function %d or %d", - opclassname, + errmsg("operator class \"%s\" of access method %s is missing support function %d or %d", + opclassname, "gin", GIN_CONSISTENT_PROC, GIN_TRICONSISTENT_PROC))); result = false; } diff --git a/src/backend/access/gist/gistvalidate.c b/src/backend/access/gist/gistvalidate.c index 585c92be26..42254c5f15 100644 --- a/src/backend/access/gist/gistvalidate.c +++ b/src/backend/access/gist/gistvalidate.c @@ -90,8 +90,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains support procedure %s with cross-type registration", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains support procedure %s with different left and right input types", + opfamilyname, "gist", format_procedure(procform->amproc)))); result = false; } @@ -143,8 +143,8 @@ gistvalidate(Oid opclassoid) default: ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "gist", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -155,8 +155,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "gist", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -175,8 +175,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "gist", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -193,8 +193,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains unsupported ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains unsupported ORDER BY specification for operator %s", + opfamilyname, "gist", format_operator(oprform->amopopr)))); result = false; } @@ -204,8 +204,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains incorrect ORDER BY opfamily specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains incorrect ORDER BY opfamily specification for operator %s", + opfamilyname, "gist", format_operator(oprform->amopopr)))); result = false; } @@ -223,8 +223,8 @@ gistvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "gist", format_operator(oprform->amopopr)))); result = false; } @@ -262,8 +262,8 @@ gistvalidate(Oid opclassoid) continue; /* optional methods */ ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("gist operator class \"%s\" is missing support function %d", - opclassname, i))); + errmsg("operator class \"%s\" of access method %s is missing support function %d", + opclassname, "gist", i))); result = false; } diff --git a/src/backend/access/hash/hashvalidate.c b/src/backend/access/hash/hashvalidate.c index f914c015bd..30b29cb100 100644 --- a/src/backend/access/hash/hashvalidate.c +++ b/src/backend/access/hash/hashvalidate.c @@ -96,8 +96,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains support procedure %s with cross-type registration", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains support procedure %s with different left and right input types", + opfamilyname, "hash", format_procedure(procform->amproc)))); result = false; } @@ -111,8 +111,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "hash", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -128,8 +128,8 @@ hashvalidate(Oid opclassoid) default: ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "hash", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -149,8 +149,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "hash", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -162,8 +162,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains invalid ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "hash", format_operator(oprform->amopopr)))); result = false; } @@ -175,8 +175,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "hash", format_operator(oprform->amopopr)))); result = false; } @@ -187,8 +187,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" lacks support function for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s lacks support function for operator %s", + opfamilyname, "hash", format_operator(oprform->amopopr)))); result = false; } @@ -215,8 +215,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" is missing operator(s) for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s", + opfamilyname, "hash", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -229,8 +229,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator class \"%s\" is missing operator(s)", - opclassname))); + errmsg("operator class \"%s\" of access method %s is missing operator(s)", + opclassname, "hash"))); result = false; } @@ -245,8 +245,8 @@ hashvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("hash operator family \"%s\" is missing cross-type operator(s)", - opfamilyname))); + errmsg("operator family \"%s\" of access method %s is missing cross-type operator(s)", + opfamilyname, "hash"))); result = false; } diff --git a/src/backend/access/nbtree/nbtvalidate.c b/src/backend/access/nbtree/nbtvalidate.c index 88e33f54cd..5aae53ac68 100644 --- a/src/backend/access/nbtree/nbtvalidate.c +++ b/src/backend/access/nbtree/nbtvalidate.c @@ -98,8 +98,8 @@ btvalidate(Oid opclassoid) default: ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "btree", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -110,8 +110,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "btree", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -130,8 +130,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "btree", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -143,8 +143,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" contains invalid ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "btree", format_operator(oprform->amopopr)))); result = false; } @@ -156,8 +156,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "btree", format_operator(oprform->amopopr)))); result = false; } @@ -198,8 +198,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" is missing operator(s) for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s", + opfamilyname, "btree", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -208,8 +208,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" is missing support function for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing support function for types %s and %s", + opfamilyname, "btree", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -222,8 +222,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator class \"%s\" is missing operator(s)", - opclassname))); + errmsg("operator class \"%s\" of access method %s is missing operator(s)", + opclassname, "btree"))); result = false; } @@ -239,8 +239,8 @@ btvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("btree operator family \"%s\" is missing cross-type operator(s)", - opfamilyname))); + errmsg("operator family \"%s\" of access method %s is missing cross-type operator(s)", + opfamilyname, "btree"))); result = false; } diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c index 1bc5bce72e..157cf2a028 100644 --- a/src/backend/access/spgist/spgvalidate.c +++ b/src/backend/access/spgist/spgvalidate.c @@ -90,8 +90,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains support procedure %s with cross-type registration", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains support procedure %s with different left and right input types", + opfamilyname, "spgist", format_procedure(procform->amproc)))); result = false; } @@ -113,8 +113,8 @@ spgvalidate(Oid opclassoid) default: ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains function %s with invalid support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with invalid support number %d", + opfamilyname, "spgist", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -125,8 +125,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains function %s with wrong signature for support number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains function %s with wrong signature for support number %d", + opfamilyname, "spgist", format_procedure(procform->amproc), procform->amprocnum))); result = false; @@ -144,8 +144,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains operator %s with invalid strategy number %d", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with invalid strategy number %d", + opfamilyname, "spgist", format_operator(oprform->amopopr), oprform->amopstrategy))); result = false; @@ -157,8 +157,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains invalid ORDER BY specification for operator %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s", + opfamilyname, "spgist", format_operator(oprform->amopopr)))); result = false; } @@ -170,8 +170,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" contains operator %s with wrong signature", - opfamilyname, + errmsg("operator family \"%s\" of access method %s contains operator %s with wrong signature", + opfamilyname, "spgist", format_operator(oprform->amopopr)))); result = false; } @@ -198,8 +198,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" is missing operator(s) for types %s and %s", - opfamilyname, + errmsg("operator family \"%s\" of access method %s is missing operator(s) for types %s and %s", + opfamilyname, "spgist", format_type_be(thisgroup->lefttype), format_type_be(thisgroup->righttype)))); result = false; @@ -218,8 +218,8 @@ spgvalidate(Oid opclassoid) continue; /* got it */ ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator family \"%s\" is missing support function %d for type %s", - opfamilyname, i, + errmsg("operator family \"%s\" of access method %s is missing support function %d for type %s", + opfamilyname, "spgist", i, format_type_be(thisgroup->lefttype)))); result = false; } @@ -231,8 +231,8 @@ spgvalidate(Oid opclassoid) { ereport(INFO, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), - errmsg("spgist operator class \"%s\" is missing operator(s)", - opclassname))); + errmsg("operator class \"%s\" of access method %s is missing operator(s)", + opclassname, "spgist"))); result = false; } diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index d3585c8449..afb54ada9f 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -393,12 +393,12 @@ ReinitializeParallelDSM(ParallelContext *pcxt) } /* Reset a few bits of fixed parallel state to a clean state. */ - fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false); fps->last_xlog_end = 0; /* Recreate error queues. */ error_queue_space = - shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false); for (i = 0; i < pcxt->nworkers; ++i) { char *start; @@ -528,16 +528,16 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) if (!anyone_alive) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1, + WaitLatch(MyLatch, WL_LATCH_SET, -1, WAIT_EVENT_PARALLEL_FINISH); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } if (pcxt->toc != NULL) { FixedParallelState *fps; - fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false); if (fps->last_xlog_end > XactLastRecEnd) XactLastRecEnd = fps->last_xlog_end; } @@ -974,8 +974,7 @@ ParallelWorkerMain(Datum main_arg) errmsg("invalid magic number in dynamic shared memory segment"))); /* Look up fixed parallel state. */ - fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); - Assert(fps != NULL); + fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false); MyFixedParallelState = fps; /* @@ -984,7 +983,7 @@ ParallelWorkerMain(Datum main_arg) * errors that happen here will not be reported back to the process that * requested that this worker be launched. */ - error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); + error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false); mq = (shm_mq *) (error_queue_space + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); shm_mq_set_sender(mq, MyProc); @@ -1028,8 +1027,7 @@ ParallelWorkerMain(Datum main_arg) * this before restoring GUCs, because the libraries might define custom * variables. */ - libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY); - Assert(libraryspace != NULL); + libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false); RestoreLibraryState(libraryspace); /* @@ -1037,8 +1035,7 @@ ParallelWorkerMain(Datum main_arg) * loading an additional library, though most likely the entry point is in * the core backend or in a library we just loaded. */ - entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT); - Assert(entrypointstate != NULL); + entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false); library_name = entrypointstate; function_name = entrypointstate + strlen(library_name) + 1; @@ -1060,30 +1057,26 @@ ParallelWorkerMain(Datum main_arg) SetClientEncoding(GetDatabaseEncoding()); /* Restore GUC values from launching backend. */ - gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); - Assert(gucspace != NULL); + gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false); StartTransactionCommand(); RestoreGUCState(gucspace); CommitTransactionCommand(); /* Crank up a transaction state appropriate to a parallel worker. */ - tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); + tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false); StartParallelWorkerTransaction(tstatespace); /* Restore combo CID state. */ - combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); - Assert(combocidspace != NULL); + combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false); RestoreComboCIDState(combocidspace); /* Restore transaction snapshot. */ - tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); - Assert(tsnapspace != NULL); + tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false); RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), fps->parallel_master_pgproc); /* Restore active snapshot. */ - asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); - Assert(asnapspace != NULL); + asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false); PushActiveSnapshot(RestoreSnapshot(asnapspace)); /* diff --git a/src/backend/access/transam/recovery.conf.sample b/src/backend/access/transam/recovery.conf.sample index acb81afd66..37fbaedaa5 100644 --- a/src/backend/access/transam/recovery.conf.sample +++ b/src/backend/access/transam/recovery.conf.sample @@ -66,11 +66,11 @@ # If you want to stop rollforward at a specific point, you # must set a recovery target. # -# You may set a recovery target either by transactionId, by name, -# or by timestamp or by WAL location (LSN) or by barrier. Recovery may either -# include or exclude the transaction(s) with the recovery target value (ie, -# stop either just after or just before the given target, respectively). In -# case of barrier, the recovery stops exactly at that point. +# You may set a recovery target either by transactionId, by name, by +# timestamp or by WAL location (LSN) or by barrier. Recovery may either include or +# exclude the transaction(s) with the recovery target value (i.e., +# stop either just after or just before the given target, +# respectively). In case of barrier, the recovery stops exactly at that point. # # #recovery_target_name = '' # e.g. 'daily backup 2011-01-26' diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index a0390bf25b..cd452c5139 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -10,6 +10,7 @@ * The tree can easily be walked from child to parent, but not in the * opposite direction. * + * This code is based on xact.c, but the robustness requirements * are completely different from pg_xact, because we only need to remember * pg_subtrans information for currently-open transactions. Thus, there is * no need to preserve data over a crash and restart. diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index f6986d37db..3c1df5166e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -204,7 +204,10 @@ typedef struct TwoPhaseStateData static TwoPhaseStateData *TwoPhaseState; /* - * Global transaction entry currently locked by us, if any. + * Global transaction entry currently locked by us, if any. Note that any + * access to the entry pointed to by this variable must be protected by + * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be + * (since it's just local memory). */ static GlobalTransaction MyLockedGxact = NULL; @@ -347,18 +350,13 @@ AtAbort_Twophase(void) * resources held by the transaction yet. In those cases, the in-memory * state can be wrong, but it's too late to back out. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); if (!MyLockedGxact->valid) - { RemoveGXact(MyLockedGxact); - } else - { - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); - MyLockedGxact->locking_backend = InvalidBackendId; + LWLockRelease(TwoPhaseStateLock); - LWLockRelease(TwoPhaseStateLock); - } MyLockedGxact = NULL; } @@ -463,6 +461,8 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid, PGXACT *pgxact; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + Assert(gxact != NULL); proc = &ProcGlobal->allProcs[gxact->pgprocno]; pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -539,15 +539,19 @@ GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts, /* * MarkAsPrepared * Mark the GXACT as fully valid, and enter it into the global ProcArray. + * + * lock_held indicates whether caller already holds TwoPhaseStateLock. */ static void -MarkAsPrepared(GlobalTransaction gxact) +MarkAsPrepared(GlobalTransaction gxact, bool lock_held) { /* Lock here may be overkill, but I'm not convinced of that ... */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + if (!lock_held) + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); Assert(!gxact->valid); gxact->valid = true; - LWLockRelease(TwoPhaseStateLock); + if (!lock_held) + LWLockRelease(TwoPhaseStateLock); /* * Put it into the global ProcArray so TransactionIdIsInProgress considers @@ -652,7 +656,7 @@ RemoveGXact(GlobalTransaction gxact) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { @@ -666,14 +670,10 @@ RemoveGXact(GlobalTransaction gxact) gxact->next = TwoPhaseState->freeGXacts; TwoPhaseState->freeGXacts = gxact; - LWLockRelease(TwoPhaseStateLock); - return; } } - LWLockRelease(TwoPhaseStateLock); - elog(ERROR, "failed to find %p in GlobalTransaction array", gxact); } @@ -1147,7 +1147,7 @@ EndPrepare(GlobalTransaction gxact) * the xact crashed. Instead we have a window where the same XID appears * twice in ProcArray, which is OK. */ - MarkAsPrepared(gxact); + MarkAsPrepared(gxact, false); /* * Now we can mark ourselves as out of the commit critical section: a @@ -1540,7 +1540,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) if (gxact->ondisk) RemoveTwoPhaseFile(xid, true); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); RemoveGXact(gxact); + LWLockRelease(TwoPhaseStateLock); MyLockedGxact = NULL; pfree(buf); @@ -1768,6 +1770,7 @@ restoreTwoPhaseData(void) struct dirent *clde; cldir = AllocateDir(TWOPHASE_DIR); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL) { if (strlen(clde->d_name) == 8 && @@ -1786,6 +1789,7 @@ restoreTwoPhaseData(void) PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); } } + LWLockRelease(TwoPhaseStateLock); FreeDir(cldir); } @@ -1826,7 +1830,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p) int allocsize = 0; int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1901,7 +1905,7 @@ StandbyRecoverPreparedTransactions(void) { int i; - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1927,7 +1931,8 @@ StandbyRecoverPreparedTransactions(void) * Scan the shared memory entries of TwoPhaseState and reload the state for * each prepared transaction (reacquire locks, etc). * - * This is run during database startup. + * This is run at the end of recovery, but before we allow backends to write + * WAL. * * At the end of recovery the way we take snapshots will change. We now need * to mark all running transactions with their full SubTransSetParent() info @@ -1941,9 +1946,7 @@ RecoverPreparedTransactions(void) { int i; - /* - * Don't need a lock in the recovery phase. - */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { TransactionId xid; @@ -1989,7 +1992,6 @@ RecoverPreparedTransactions(void) * Recreate its GXACT and dummy PGPROC. But, check whether it was * added in redo and already has a shmem entry for it. */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); MarkAsPreparingGuts(gxact, xid, gid, hdr->prepared_at, hdr->owner, hdr->database); @@ -1997,13 +1999,13 @@ RecoverPreparedTransactions(void) /* recovered, so reset the flag for entries generated by redo */ gxact->inredo = false; - LWLockRelease(TwoPhaseStateLock); - GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids); - MarkAsPrepared(gxact); + MarkAsPrepared(gxact, true); + + LWLockRelease(TwoPhaseStateLock); /* - * Recover other state (notably locks) using resource managers + * Recover other state (notably locks) using resource managers. */ ProcessRecords(bufptr, xid, twophase_recover_callbacks); @@ -2022,7 +2024,11 @@ RecoverPreparedTransactions(void) PostPrepare_Twophase(); pfree(buf); + + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); } + + LWLockRelease(TwoPhaseStateLock); } /* @@ -2048,6 +2054,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, TwoPhaseFileHeader *hdr; int i; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); + if (!fromdisk) Assert(prepare_start_lsn != InvalidXLogRecPtr); @@ -2064,8 +2072,8 @@ ProcessTwoPhaseBuffer(TransactionId xid, else { ereport(WARNING, - (errmsg("removing stale two-phase state from" - " shared memory for \"%u\"", xid))); + (errmsg("removing stale two-phase state from shared memory for \"%u\"", + xid))); PrepareRedoRemove(xid, true); } return NULL; @@ -2342,6 +2350,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) const char *gid; GlobalTransaction gxact; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader)); @@ -2358,7 +2367,6 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) * that it got added in the redo phase */ - LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); /* Get a free gxact from the freelist */ if (TwoPhaseState->freeGXacts == NULL) ereport(ERROR, @@ -2384,17 +2392,17 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; - LWLockRelease(TwoPhaseStateLock); - - elog(DEBUG2, "Adding 2PC data to shared memory %u", gxact->xid); + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } /* * PrepareRedoRemove * - * Remove the corresponding gxact entry from TwoPhaseState. Also - * remove the 2PC file if a prepared transaction was saved via - * an earlier checkpoint. + * Remove the corresponding gxact entry from TwoPhaseState. Also remove + * the 2PC file if a prepared transaction was saved via an earlier checkpoint. + * + * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState + * is updated. */ void PrepareRedoRemove(TransactionId xid, bool giveWarning) @@ -2403,9 +2411,9 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) int i; bool found = false; + Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE)); Assert(RecoveryInProgress()); - LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (i = 0; i < TwoPhaseState->numPrepXacts; i++) { gxact = TwoPhaseState->prepXacts[i]; @@ -2417,7 +2425,6 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) break; } } - LWLockRelease(TwoPhaseStateLock); /* * Just leave if there is nothing, this is expected during WAL replay. @@ -2428,7 +2435,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning) /* * And now we can clean up any files we may have left. */ - elog(DEBUG2, "Removing 2PC data from shared memory %u", xid); + elog(DEBUG2, "removing 2PC data for transaction %u", xid); if (gxact->ondisk) RemoveTwoPhaseFile(xid, giveWarning); RemoveGXact(gxact); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 77666c4b80..481231d13a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -6215,6 +6215,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, int i; TimestampTz commit_time; + Assert(TransactionIdIsValid(xid)); + max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); /* @@ -6382,6 +6384,8 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) int i; TransactionId max_xid; + Assert(TransactionIdIsValid(xid)); + /* * Make sure nextXid is beyond any XID mentioned in the record. * @@ -6462,51 +6466,49 @@ xact_redo(XLogReaderState *record) /* Backup blocks are not used in xact records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) + if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); xl_xact_parsed_commit parsed; - ParseCommitRecord(XLogRecGetInfo(record), xlrec, - &parsed); + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, XLogRecGetXid(record), + record->EndRecPtr, XLogRecGetOrigin(record)); + } + else if (info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; - if (info == XLOG_XACT_COMMIT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); + ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_commit(&parsed, parsed.twophase_xid, + record->EndRecPtr, XLogRecGetOrigin(record)); - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } - else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) + else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); xl_xact_parsed_abort parsed; - ParseAbortRecord(XLogRecGetInfo(record), xlrec, - &parsed); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, XLogRecGetXid(record)); + } + else if (info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; - if (info == XLOG_XACT_ABORT) - { - Assert(!TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, XLogRecGetXid(record)); - } - else - { - Assert(TransactionIdIsValid(parsed.twophase_xid)); - xact_redo_abort(&parsed, parsed.twophase_xid); + ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); + xact_redo_abort(&parsed, parsed.twophase_xid); - /* Delete TwoPhaseState gxact entry and/or 2PC file. */ - PrepareRedoRemove(parsed.twophase_xid, false); - } + /* Delete TwoPhaseState gxact entry and/or 2PC file. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); + PrepareRedoRemove(parsed.twophase_xid, false); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_PREPARE) { @@ -6514,9 +6516,11 @@ xact_redo(XLogReaderState *record) * Store xid and start/end pointers of the WAL record in TwoPhaseState * gxact entry. */ + LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr); + LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index b29f283e6a..a07bb572ea 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8402,6 +8402,11 @@ ShutdownXLOG(int code, Datum arg) (errmsg("shutting down"))); /* + * Signal walsenders to move to stopping state. + */ + WalSndInitStopping(); + + /* * Wait for WAL senders to be in stopping state. This prevents commands * from writing new WAL. */ diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index b3223d691d..fb905c0a1c 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -449,7 +449,7 @@ pg_last_wal_replay_lsn(PG_FUNCTION_ARGS) /* * Compute an xlog file name and decimal byte offset given a WAL location, - * such as is returned by pg_stop_backup() or pg_xlog_switch(). + * such as is returned by pg_stop_backup() or pg_switch_wal(). * * Note that a location exactly at a segment boundary is taken to be in * the previous segment. This is usually the right thing, since the @@ -515,7 +515,7 @@ pg_walfile_name_offset(PG_FUNCTION_ARGS) /* * Compute an xlog file name given a WAL location, - * such as is returned by pg_stop_backup() or pg_xlog_switch(). + * such as is returned by pg_stop_backup() or pg_switch_wal(). */ Datum pg_walfile_name(PG_FUNCTION_ARGS) diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index de3695c7e0..2e1fef0350 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -323,6 +323,7 @@ Boot_DeclareIndexStmt: $4, false, false, + false, true, /* skip_build */ false); do_end(); @@ -366,6 +367,7 @@ Boot_DeclareUniqueIndexStmt: $5, false, false, + false, true, /* skip_build */ false); do_end(); diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index c2274ae2ff..7be0e30a74 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -878,6 +878,11 @@ InsertOneNull(int i) { elog(DEBUG4, "inserting column %d NULL", i); Assert(i >= 0 && i < MAXATTR); + if (boot_reldesc->rd_att->attrs[i]->attnotnull) + elog(ERROR, + "NULL value specified for not-null column \"%s\" of relation \"%s\"", + NameStr(boot_reldesc->rd_att->attrs[i]->attname), + RelationGetRelationName(boot_reldesc)); values[i] = PointerGetDatum(NULL); Nulls[i] = true; } diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 387a3be701..304e3c4bc3 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -2738,7 +2738,7 @@ ExecGrant_Largeobject(InternalGrant *istmt) tuple = systable_getnext(scan); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for large object %u", loid); + elog(ERROR, "could not find tuple for large object %u", loid); form_lo_meta = (Form_pg_largeobject_metadata) GETSTRUCT(tuple); @@ -5503,7 +5503,7 @@ recordExtObjInitPriv(Oid objoid, Oid classoid) tuple = systable_getnext(scan); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for large object %u", objoid); + elog(ERROR, "could not find tuple for large object %u", objoid); aclDatum = heap_getattr(tuple, Anum_pg_largeobject_metadata_lomacl, diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index 2e8cd10ebb..0f7ffef6d0 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -39,6 +39,7 @@ #include "catalog/pg_shseclabel.h" #include "catalog/pg_subscription.h" #include "catalog/pg_tablespace.h" +#include "catalog/pg_type.h" #include "catalog/toasting.h" #include "catalog/pgxc_node.h" #include "catalog/pgxc_group.h" @@ -357,6 +358,14 @@ GetNewOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn) ScanKeyData key; bool collides; + /* + * We should never be asked to generate a new pg_type OID during + * pg_upgrade; doing so would risk collisions with the OIDs it wants to + * assign. Hitting this assert means there's some path where we failed to + * ensure that a type OID is determined by commands in the dump script. + */ + Assert(!IsBinaryUpgrade || RelationGetRelid(relation) != TypeRelationId); + InitDirtySnapshot(SnapshotDirty); /* Generate new OIDs until we find one not in the table */ @@ -408,6 +417,13 @@ GetNewRelFileNode(Oid reltablespace, Relation pg_class, char relpersistence) bool collides; BackendId backend; + /* + * If we ever get here during pg_upgrade, there's something wrong; all + * relfilenode assignments during a binary-upgrade run should be + * determined by commands in the dump script. + */ + Assert(!IsBinaryUpgrade); + switch (relpersistence) { case RELPERSISTENCE_TEMP: diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index ea3d2ade21..a1b7bd2f72 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -3541,9 +3541,14 @@ StorePartitionKey(Relation rel, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); - referenced.classId = CollationRelationId; - referenced.objectId = partcollation[i]; - referenced.objectSubId = 0; + /* The default collation is pinned, so don't bother recording it */ + if (OidIsValid(partcollation[i]) && + partcollation[i] != DEFAULT_COLLATION_OID) + { + referenced.classId = CollationRelationId; + referenced.objectId = partcollation[i]; + referenced.objectSubId = 0; + } recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 9104855ce2..d0d208e98d 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -3477,8 +3477,8 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence, ereport(INFO, (errmsg("index \"%s\" was reindexed", get_rel_name(indexId)), - errdetail("%s.", - pg_rusage_show(&ru0)))); + errdetail_internal("%s", + pg_rusage_show(&ru0)))); /* Close rels, but keep locks */ index_close(iRel, NoLock); diff --git a/src/backend/catalog/information_schema.sql b/src/backend/catalog/information_schema.sql index cbcd6cfbc1..98bcfa08c6 100644 --- a/src/backend/catalog/information_schema.sql +++ b/src/backend/catalog/information_schema.sql @@ -2936,12 +2936,14 @@ CREATE VIEW user_mapping_options AS SELECT authorization_identifier, foreign_server_catalog, foreign_server_name, - CAST((pg_options_to_table(um.umoptions)).option_name AS sql_identifier) AS option_name, + CAST(opts.option_name AS sql_identifier) AS option_name, CAST(CASE WHEN (umuser <> 0 AND authorization_identifier = current_user) OR (umuser = 0 AND pg_has_role(srvowner, 'USAGE')) - OR (SELECT rolsuper FROM pg_authid WHERE rolname = current_user) THEN (pg_options_to_table(um.umoptions)).option_value + OR (SELECT rolsuper FROM pg_authid WHERE rolname = current_user) + THEN opts.option_value ELSE NULL END AS character_data) AS option_value - FROM _pg_user_mappings um; + FROM _pg_user_mappings um, + pg_options_to_table(um.umoptions) opts; GRANT SELECT ON user_mapping_options TO PUBLIC; diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index d7f6075b13..a29a232e8b 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -3898,7 +3898,7 @@ InitTempTableNamespace(void) if (IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), - errmsg("cannot create temporary tables in parallel mode"))); + errmsg("cannot create temporary tables during a parallel operation"))); #ifdef XCP /* diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 6a365dceec..05096959de 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -1849,8 +1849,13 @@ get_object_address_defacl(List *object, bool missing_ok) default: ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("unrecognized default ACL object type %c", objtype), - errhint("Valid object types are \"r\", \"S\", \"f\", \"T\" and \"s\"."))); + errmsg("unrecognized default ACL object type \"%c\"", objtype), + errhint("Valid object types are \"%c\", \"%c\", \"%c\", \"%c\", \"%c\".", + DEFACLOBJ_RELATION, + DEFACLOBJ_SEQUENCE, + DEFACLOBJ_FUNCTION, + DEFACLOBJ_TYPE, + DEFACLOBJ_NAMESPACE))); } /* @@ -3345,7 +3350,7 @@ getObjectDescription(const ObjectAddress *object) tuple = systable_getnext(sscan); if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for policy %u", + elog(ERROR, "could not find tuple for policy %u", object->objectId); form_policy = (Form_pg_policy) GETSTRUCT(tuple); diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c index 37fa1458be..a7c9b9a46c 100644 --- a/src/backend/catalog/partition.c +++ b/src/backend/catalog/partition.c @@ -454,6 +454,7 @@ RelationBuildPartitionDesc(Relation rel) palloc0(sizeof(PartitionBoundInfoData)); boundinfo->strategy = key->strategy; boundinfo->ndatums = ndatums; + boundinfo->null_index = -1; boundinfo->datums = (Datum **) palloc0(ndatums * sizeof(Datum *)); /* Initialize mapping array with invalid values */ @@ -503,8 +504,6 @@ RelationBuildPartitionDesc(Relation rel) mapping[null_index] = next_index++; boundinfo->null_index = mapping[null_index]; } - else - boundinfo->null_index = -1; /* All partition must now have a valid mapping */ Assert(next_index == nparts); @@ -874,7 +873,8 @@ get_partition_parent(Oid relid) NULL, 2, key); tuple = systable_getnext(scan); - Assert(HeapTupleIsValid(tuple)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "could not find tuple for parent of relation %u", relid); form = (Form_pg_inherits) GETSTRUCT(tuple); result = form->inhparent; diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ab5f3719fc..c69c461b62 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray) /* * Set the state of a subscription table. * + * If update_only is true and the record for given table doesn't exist, do + * nothing. This can be used to avoid inserting a new record that was deleted + * by someone else. Generally, subscription DDL commands should use false, + * workers should use true. + * * The insert-or-update logic in this function is not concurrency safe so it * might raise an error in rare circumstances. But if we took a stronger lock * such as ShareRowExclusiveLock, we would risk more deadlocks. */ Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, bool update_only) { Relation rel; HeapTuple tup; - Oid subrelid; + Oid subrelid = InvalidOid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; @@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, * If the record for given table does not exist yet create new record, * otherwise update the existing one. */ - if (!HeapTupleIsValid(tup)) + if (!HeapTupleIsValid(tup) && !update_only) { /* Form the tuple. */ memset(values, 0, sizeof(values)); @@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, heap_freetuple(tup); } - else + else if (HeapTupleIsValid(tup)) { bool replaces[Natts_pg_subscription_rel]; @@ -396,7 +401,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid) scan = heap_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) { - simple_heap_delete(rel, &tup->t_self); + CatalogTupleDelete(rel, &tup->t_self); } heap_endscan(scan); diff --git a/src/backend/commands/collationcmds.c b/src/backend/commands/collationcmds.c index 110fb7ef65..91b65b174d 100644 --- a/src/backend/commands/collationcmds.c +++ b/src/backend/commands/collationcmds.c @@ -120,6 +120,18 @@ DefineCollation(ParseState *pstate, List *names, List *parameters, bool if_not_e collprovider = ((Form_pg_collation) GETSTRUCT(tp))->collprovider; ReleaseSysCache(tp); + + /* + * Copying the "default" collation is not allowed because most code + * checks for DEFAULT_COLLATION_OID instead of COLLPROVIDER_DEFAULT, + * and so having a second collation with COLLPROVIDER_DEFAULT would + * not work and potentially confuse or crash some code. This could be + * fixed with some legwork. + */ + if (collprovider == COLLPROVIDER_DEFAULT) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("collation \"default\" cannot be copied"))); } if (localeEl) @@ -411,10 +423,10 @@ get_icu_locale_comment(const char *localename) Datum pg_import_system_collations(PG_FUNCTION_ARGS) { -#if defined(HAVE_LOCALE_T) && !defined(WIN32) bool if_not_exists = PG_GETARG_BOOL(0); Oid nspid = PG_GETARG_OID(1); +#if defined(HAVE_LOCALE_T) && !defined(WIN32) FILE *locale_a_handle; char localebuf[NAMEDATALEN]; /* we assume ASCII so this is fine */ int count = 0; @@ -431,6 +443,12 @@ pg_import_system_collations(PG_FUNCTION_ARGS) (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), (errmsg("must be superuser to import system collations")))); +#if !(defined(HAVE_LOCALE_T) && !defined(WIN32)) && !defined(USE_ICU) + /* silence compiler warnings */ + (void) if_not_exists; + (void) nspid; +#endif + #if defined(HAVE_LOCALE_T) && !defined(WIN32) locale_a_handle = OpenPipeStream("locale -a", "r"); if (locale_a_handle == NULL) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 5d5e409c7d..3ae52116f4 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1622,7 +1622,7 @@ BeginCopy(ParseState *pstate, } /* plan the query */ - plan = pg_plan_query(query, 0, NULL); + plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL); /* * With row level security and a user using "COPY relation TO", we @@ -2827,9 +2827,24 @@ CopyFrom(CopyState cstate) } else { + /* + * We always check the partition constraint, including when + * the tuple got here via tuple-routing. However we don't + * need to in the latter case if no BR trigger is defined on + * the partition. Note that a BR trigger might modify the + * tuple such that the partition constraint is no longer + * satisfied, so we need to check in that case. + */ + bool check_partition_constr = + (resultRelInfo->ri_PartitionCheck != NIL); + + if (saved_resultRelInfo != NULL && + !(resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row)) + check_partition_constr = false; + /* Check the constraints of the tuple */ - if (cstate->rel->rd_att->constr || - resultRelInfo->ri_PartitionCheck) + if (cstate->rel->rd_att->constr || check_partition_constr) ExecConstraints(resultRelInfo, slot, estate); if (useHeapMultiInsert) diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c index 80c352ed0c..197955624f 100644 --- a/src/backend/commands/extension.c +++ b/src/backend/commands/extension.c @@ -2390,7 +2390,7 @@ pg_extension_config_dump(PG_FUNCTION_ARGS) extTup = systable_getnext(extScan); if (!HeapTupleIsValid(extTup)) /* should not happen */ - elog(ERROR, "extension with oid %u does not exist", + elog(ERROR, "could not find tuple for extension %u", CurrentExtensionObject); memset(repl_val, 0, sizeof(repl_val)); @@ -2538,7 +2538,7 @@ extension_config_remove(Oid extensionoid, Oid tableoid) extTup = systable_getnext(extScan); if (!HeapTupleIsValid(extTup)) /* should not happen */ - elog(ERROR, "extension with oid %u does not exist", + elog(ERROR, "could not find tuple for extension %u", extensionoid); /* Search extconfig for the tableoid */ @@ -2739,7 +2739,8 @@ AlterExtensionNamespace(const char *extensionName, const char *newschema, Oid *o extTup = systable_getnext(extScan); if (!HeapTupleIsValid(extTup)) /* should not happen */ - elog(ERROR, "extension with oid %u does not exist", extensionOid); + elog(ERROR, "could not find tuple for extension %u", + extensionOid); /* Copy tuple so we can modify it below */ extTup = heap_copytuple(extTup); @@ -3060,7 +3061,7 @@ ApplyExtensionUpdates(Oid extensionOid, extTup = systable_getnext(extScan); if (!HeapTupleIsValid(extTup)) /* should not happen */ - elog(ERROR, "extension with oid %u does not exist", + elog(ERROR, "could not find tuple for extension %u", extensionOid); extForm = (Form_pg_extension) GETSTRUCT(extTup); diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index 87ff7faf48..f611e3e394 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -301,6 +301,9 @@ CheckIndexCompatible(Oid oldId, * 'is_alter_table': this is due to an ALTER rather than a CREATE operation. * 'check_rights': check for CREATE rights in namespace and tablespace. (This * should be true except when ALTER is deleting/recreating an index.) + * 'check_not_in_use': check for table not already in use in current session. + * This should be true unless caller is holding the table open, in which + * case the caller had better have checked it earlier. * 'skip_build': make the catalog entries but leave the index file empty; * it will be filled later. * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints. @@ -313,6 +316,7 @@ DefineIndex(Oid relationId, Oid indexRelationId, bool is_alter_table, bool check_rights, + bool check_not_in_use, bool skip_build, bool quiet) { @@ -411,6 +415,15 @@ DefineIndex(Oid relationId, errmsg("cannot create indexes on temporary tables of other sessions"))); /* + * Unless our caller vouches for having checked this already, insist that + * the table not be in use by our own session, either. Otherwise we might + * fail to make entries in the new index (for instance, if an INSERT or + * UPDATE is in progress and has already made its list of target indexes). + */ + if (check_not_in_use) + CheckTableNotInUse(rel, "CREATE INDEX"); + + /* * Verify we (still) have CREATE rights in the rel's namespace. * (Presumably we did when the rel was created, but maybe not anymore.) * Skip check if caller doesn't want it. Also skip check if diff --git a/src/backend/commands/policy.c b/src/backend/commands/policy.c index 4a758426c3..dad31df517 100644 --- a/src/backend/commands/policy.c +++ b/src/backend/commands/policy.c @@ -474,7 +474,8 @@ RemoveRoleFromObjectPolicy(Oid roleid, Oid classid, Oid policy_id) rel = relation_open(relid, AccessExclusiveLock); - if (rel->rd_rel->relkind != RELKIND_RELATION) + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is not a table", diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index ed50208d51..13f818a036 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -142,12 +142,12 @@ static void create_seq_hashtable(void); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); static Form_pg_sequence_data read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple); -static LOCKMODE alter_sequence_get_lock_level(List *options); static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, - bool *changed_seqform, - Form_pg_sequence_data seqdataform, List **owned_by, + Form_pg_sequence_data seqdataform, + bool *need_seq_rewrite, + List **owned_by, bool *is_restart); static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); @@ -162,7 +162,7 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) { FormData_pg_sequence seqform; FormData_pg_sequence_data seqdataform; - bool changed_seqform = false; /* not used here */ + bool need_seq_rewrite; List *owned_by; CreateStmt *stmt = makeNode(CreateStmt); Oid seqoid; @@ -210,8 +210,10 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) } /* Check and set all option values */ - init_params(pstate, seq->options, seq->for_identity, true, &seqform, - &changed_seqform, &seqdataform, &owned_by, &is_restart); + init_params(pstate, seq->options, seq->for_identity, true, + &seqform, &seqdataform, + &need_seq_rewrite, &owned_by, + &is_restart); /* * Create relation (and fill value[] and null[] for the tuple) @@ -497,11 +499,10 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) SeqTable elm; Relation seqrel; Buffer buf; - HeapTupleData seqdatatuple; + HeapTupleData datatuple; Form_pg_sequence seqform; - Form_pg_sequence_data seqdata; - FormData_pg_sequence_data newseqdata; - bool changed_seqform = false; + Form_pg_sequence_data newdataform; + bool need_seq_rewrite; List *owned_by; #ifdef PGXC GTM_Sequence start_value; @@ -514,11 +515,12 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) #endif ObjectAddress address; Relation rel; - HeapTuple tuple; + HeapTuple seqtuple; + HeapTuple newdatatuple; /* Open and lock sequence. */ relid = RangeVarGetRelid(stmt->sequence, - alter_sequence_get_lock_level(stmt->options), + ShareRowExclusiveLock, stmt->missing_ok); if (relid == InvalidOid) { @@ -536,32 +538,33 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) stmt->sequence->relname); rel = heap_open(SequenceRelationId, RowExclusiveLock); - tuple = SearchSysCacheCopy1(SEQRELID, - ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tuple)) + seqtuple = SearchSysCacheCopy1(SEQRELID, + ObjectIdGetDatum(relid)); + if (!HeapTupleIsValid(seqtuple)) elog(ERROR, "cache lookup failed for sequence %u", relid); - seqform = (Form_pg_sequence) GETSTRUCT(tuple); + seqform = (Form_pg_sequence) GETSTRUCT(seqtuple); /* lock page's buffer and read tuple into new sequence structure */ - seqdata = read_seq_tuple(seqrel, &buf, &seqdatatuple); + (void) read_seq_tuple(seqrel, &buf, &datatuple); + + /* copy the existing sequence data tuple, so it can be modified localy */ + newdatatuple = heap_copytuple(&datatuple); + newdataform = (Form_pg_sequence_data) GETSTRUCT(newdatatuple); - /* Copy old sequence data into workspace */ - memcpy(&newseqdata, seqdata, sizeof(FormData_pg_sequence_data)); + UnlockReleaseBuffer(buf); /* Check and set new values */ - init_params(pstate, stmt->options, stmt->for_identity, false, seqform, - &changed_seqform, &newseqdata, &owned_by, &is_restart); + init_params(pstate, stmt->options, stmt->for_identity, false, + seqform, newdataform, + &need_seq_rewrite, &owned_by, + &is_restart); /* Clear local cache so that we don't think we have cached numbers */ /* Note that we do not change the currval() state */ elm->cached = elm->last; - /* check the comment above nextval_internal()'s equivalent call. */ - if (RelationNeedsWAL(seqrel)) - GetTopTransactionId(); - /* Now okay to update the on-disk tuple */ #ifdef PGXC increment = seqform->seqincrement; @@ -572,48 +575,40 @@ AlterSequence(ParseState *pstate, AlterSeqStmt *stmt) cycle = seqform->seqcycle; #endif - START_CRIT_SECTION(); - - memcpy(seqdata, &newseqdata, sizeof(FormData_pg_sequence_data)); - - MarkBufferDirty(buf); - - /* XLOG stuff */ - if (RelationNeedsWAL(seqrel)) + /* If needed, rewrite the sequence relation itself */ + if (need_seq_rewrite) { - xl_seq_rec xlrec; - XLogRecPtr recptr; - Page page = BufferGetPage(buf); - - XLogBeginInsert(); - XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); - - xlrec.node = seqrel->rd_node; - XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); - - XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); + /* check the comment above nextval_internal()'s equivalent call. */ + if (RelationNeedsWAL(seqrel)) + GetTopTransactionId(); - recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG); + /* + * Create a new storage file for the sequence, making the state + * changes transactional. We want to keep the sequence's relfrozenxid + * at 0, since it won't contain any unfrozen XIDs. Same with + * relminmxid, since a sequence will never contain multixacts. + */ + RelationSetNewRelfilenode(seqrel, seqrel->rd_rel->relpersistence, + InvalidTransactionId, InvalidMultiXactId); - PageSetLSN(page, recptr); + /* + * Insert the modified tuple into the new storage file. + */ + fill_seq_with_data(seqrel, newdatatuple); } - END_CRIT_SECTION(); - - UnlockReleaseBuffer(buf); - /* process OWNED BY if given */ if (owned_by) process_owned_by(seqrel, owned_by, stmt->for_identity); + /* update the pg_sequence tuple (we could skip this in some cases...) */ + CatalogTupleUpdate(rel, &seqtuple->t_self, seqtuple); + InvokeObjectPostAlterHook(RelationRelationId, relid, 0); ObjectAddressSet(address, RelationRelationId, relid); - if (changed_seqform) - CatalogTupleUpdate(rel, &tuple->t_self, tuple); heap_close(rel, RowExclusiveLock); - relation_close(seqrel, NoLock); #ifdef PGXC @@ -1447,46 +1442,30 @@ read_seq_tuple(Relation rel, Buffer *buf, HeapTuple seqdatatuple) } /* - * Check the sequence options list and return the appropriate lock level for - * ALTER SEQUENCE. - * - * Most sequence option changes require a self-exclusive lock and should block - * concurrent nextval() et al. But RESTART does not, because it's not - * transactional. Also take a lower lock if no option at all is present. - */ -static LOCKMODE -alter_sequence_get_lock_level(List *options) -{ - ListCell *option; - - foreach(option, options) - { - DefElem *defel = (DefElem *) lfirst(option); - - if (strcmp(defel->defname, "restart") != 0) - return ShareRowExclusiveLock; - } - - return RowExclusiveLock; -} - -/* * init_params: process the options list of CREATE or ALTER SEQUENCE, and * store the values into appropriate fields of seqform, for changes that go - * into the pg_sequence catalog, and seqdataform for changes to the sequence - * relation itself. Set *changed_seqform to true if seqform was changed - * (interesting for ALTER SEQUENCE). Also set *owned_by to any OWNED BY - * option, or to NIL if there is none. + * into the pg_sequence catalog, and fields of seqdataform for changes to the + * sequence relation itself. Set *need_seq_rewrite to true if we changed any + * parameters that require rewriting the sequence's relation (interesting for + * ALTER SEQUENCE). Also set *owned_by to any OWNED BY option, or to NIL if + * there is none. * * If isInit is true, fill any unspecified options with default values; * otherwise, do not change existing options that aren't explicitly overridden. + * + * Note: we force a sequence rewrite whenever we change parameters that affect + * generation of future sequence values, even if the seqdataform per se is not + * changed. This allows ALTER SEQUENCE to behave transactionally. Currently, + * the only option that doesn't cause that is OWNED BY. It's *necessary* for + * ALTER SEQUENCE OWNED BY to not rewrite the sequence, because that would + * break pg_upgrade by causing unwanted changes in the sequence's relfilenode. */ static void init_params(ParseState *pstate, List *options, bool for_identity, bool isInit, Form_pg_sequence seqform, - bool *changed_seqform, Form_pg_sequence_data seqdataform, + bool *need_seq_rewrite, List **owned_by, bool *is_restart) { @@ -1506,6 +1485,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, *is_restart = false; #endif + *need_seq_rewrite = false; *owned_by = NIL; foreach(option, options) @@ -1520,6 +1500,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); as_type = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "increment") == 0) { @@ -1529,6 +1510,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); increment_by = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "start") == 0) { @@ -1538,6 +1520,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); start_value = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "restart") == 0) { @@ -1547,6 +1530,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); restart_value = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "maxvalue") == 0) { @@ -1556,6 +1540,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); max_value = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "minvalue") == 0) { @@ -1565,6 +1550,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); min_value = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "cache") == 0) { @@ -1574,6 +1560,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); cache_value = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "cycle") == 0) { @@ -1583,6 +1570,7 @@ init_params(ParseState *pstate, List *options, bool for_identity, errmsg("conflicting or redundant options"), parser_errposition(pstate, defel->location))); is_cycled = defel; + *need_seq_rewrite = true; } else if (strcmp(defel->defname, "owned_by") == 0) { @@ -1610,8 +1598,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, defel->defname); } - *changed_seqform = false; - /* * We must reset log_cnt when isInit or when changing any parameters that * would affect future nextval allocations. @@ -1652,19 +1638,16 @@ init_params(ParseState *pstate, List *options, bool for_identity, } seqform->seqtypid = newtypid; - *changed_seqform = true; } else if (isInit) { seqform->seqtypid = INT8OID; - *changed_seqform = true; } /* INCREMENT BY */ if (increment_by != NULL) { seqform->seqincrement = defGetInt64(increment_by); - *changed_seqform = true; if (seqform->seqincrement == 0) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1674,28 +1657,24 @@ init_params(ParseState *pstate, List *options, bool for_identity, else if (isInit) { seqform->seqincrement = 1; - *changed_seqform = true; } /* CYCLE */ if (is_cycled != NULL) { seqform->seqcycle = intVal(is_cycled->arg); - *changed_seqform = true; Assert(BoolIsValid(seqform->seqcycle)); seqdataform->log_cnt = 0; } else if (isInit) { seqform->seqcycle = false; - *changed_seqform = true; } /* MAXVALUE (null arg means NO MAXVALUE) */ if (max_value != NULL && max_value->arg) { seqform->seqmax = defGetInt64(max_value); - *changed_seqform = true; seqdataform->log_cnt = 0; } else if (isInit || max_value != NULL || reset_max_value) @@ -1712,7 +1691,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, } else seqform->seqmax = -1; /* descending seq */ - *changed_seqform = true; seqdataform->log_cnt = 0; } @@ -1734,7 +1712,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (min_value != NULL && min_value->arg) { seqform->seqmin = defGetInt64(min_value); - *changed_seqform = true; seqdataform->log_cnt = 0; } else if (isInit || min_value != NULL || reset_min_value) @@ -1751,7 +1728,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, } else seqform->seqmin = 1; /* ascending seq */ - *changed_seqform = true; seqdataform->log_cnt = 0; } @@ -1787,7 +1763,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (start_value != NULL) { seqform->seqstart = defGetInt64(start_value); - *changed_seqform = true; } else if (isInit) { @@ -1795,7 +1770,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, seqform->seqstart = seqform->seqmin; /* ascending seq */ else seqform->seqstart = seqform->seqmax; /* descending seq */ - *changed_seqform = true; } /* crosscheck START */ @@ -1874,7 +1848,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, if (cache_value != NULL) { seqform->seqcache = defGetInt64(cache_value); - *changed_seqform = true; if (seqform->seqcache <= 0) { char buf[100]; @@ -1890,7 +1863,6 @@ init_params(ParseState *pstate, List *options, bool for_identity, else if (isInit) { seqform->seqcache = 1; - *changed_seqform = true; } } diff --git a/src/backend/commands/statscmds.c b/src/backend/commands/statscmds.c index 2b3785f394..ea0a561401 100644 --- a/src/backend/commands/statscmds.c +++ b/src/backend/commands/statscmds.c @@ -301,8 +301,7 @@ CreateStatistics(CreateStatsStmt *stmt) /* insert it into pg_statistic_ext */ statrel = heap_open(StatisticExtRelationId, RowExclusiveLock); htup = heap_form_tuple(statrel->rd_att, values, nulls); - CatalogTupleInsert(statrel, htup); - statoid = HeapTupleGetOid(htup); + statoid = CatalogTupleInsert(statrel, htup); heap_freetuple(htup); relation_close(statrel, RowExclusiveLock); @@ -372,7 +371,7 @@ RemoveStatisticsById(Oid statsOid) CacheInvalidateRelcacheByRelid(relid); - simple_heap_delete(relation, &tup->t_self); + CatalogTupleDelete(relation, &tup->t_self); ReleaseSysCache(tup); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 86eb31df93..5aae7b6f91 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,12 +64,14 @@ static void parse_subscription_options(List *options, bool *connect, bool *enabled_given, bool *enabled, bool *create_slot, bool *slot_name_given, char **slot_name, - bool *copy_data, char **synchronous_commit) + bool *copy_data, char **synchronous_commit, + bool *refresh) { ListCell *lc; bool connect_given = false; bool create_slot_given = false; bool copy_data_given = false; + bool refresh_given = false; /* If connect is specified, the others also need to be. */ Assert(!connect || (enabled && create_slot && copy_data)); @@ -92,6 +94,8 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, *copy_data = true; if (synchronous_commit) *synchronous_commit = NULL; + if (refresh) + *refresh = true; /* Parse options */ foreach(lc, options) @@ -167,6 +171,16 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } + else if (strcmp(defel->defname, "refresh") == 0 && refresh) + { + if (refresh_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + refresh_given = true; + *refresh = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -315,7 +329,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) */ parse_subscription_options(stmt->options, &connect, &enabled_given, &enabled, &create_slot, &slotname_given, - &slotname, ©_data, &synchronous_commit); + &slotname, ©_data, &synchronous_commit, + NULL); /* * Since creating a replication slot is not transactional, rolling back @@ -436,12 +451,9 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) rv->schemaname, rv->relname); SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + InvalidXLogRecPtr, false); } - ereport(NOTICE, - (errmsg("synchronized table states"))); - /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -559,7 +571,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { SetSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); + InvalidXLogRecPtr, false); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), @@ -585,6 +597,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RemoveSubscriptionRel(sub->oid, relid); + logicalrep_worker_stop(sub->oid, relid); + namespace = get_namespace_name(get_rel_namespace(relid)); ereport(NOTICE, (errmsg("removed subscription for table %s.%s", @@ -645,7 +659,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, &slotname_given, &slotname, - NULL, &synchronous_commit); + NULL, &synchronous_commit, NULL); if (slotname_given) { @@ -680,7 +694,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, &enabled_given, &enabled, NULL, - NULL, NULL, NULL, NULL); + NULL, NULL, NULL, NULL, NULL); Assert(enabled_given); if (!sub->slotname && enabled) @@ -712,13 +726,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) break; case ALTER_SUBSCRIPTION_PUBLICATION: - case ALTER_SUBSCRIPTION_PUBLICATION_REFRESH: { bool copy_data; + bool refresh; parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL); + NULL, &refresh); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -727,12 +741,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) update_tuple = true; /* Refresh if user asked us to. */ - if (stmt->kind == ALTER_SUBSCRIPTION_PUBLICATION_REFRESH) + if (refresh) { if (!sub->enabled) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* Make sure refresh sees the new list of publications. */ sub->publications = stmt->publication; @@ -754,7 +769,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, ©_data, - NULL); + NULL, NULL); AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 78ab65f600..7c260082bd 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -299,6 +299,14 @@ struct DropRelationCallbackState #define ATT_COMPOSITE_TYPE 0x0010 #define ATT_FOREIGN_TABLE 0x0020 +/* + * Partition tables are expected to be dropped when the parent partitioned + * table gets dropped. Hence for partitioning we use AUTO dependency. + * Otherwise, for regular inheritance use NORMAL dependency. + */ +#define child_dependency_type(child_is_partition) \ + ((child_is_partition) ? DEPENDENCY_AUTO : DEPENDENCY_NORMAL) + static void truncate_check_rel(Relation rel); static List *MergeAttributes(List *schema, List *supers, char relpersistence, bool is_partition, List **supOids, List **supconstr, @@ -455,7 +463,8 @@ static void ATExecEnableDisableRule(Relation rel, char *rulename, static void ATPrepAddInherit(Relation child_rel); static ObjectAddress ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode); static ObjectAddress ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode); -static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid); +static void drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid, + DependencyType deptype); static ObjectAddress ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode); static void ATExecDropOf(Relation rel, LOCKMODE lockmode); static void ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode); @@ -2428,14 +2437,8 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid, childobject.objectId = relationId; childobject.objectSubId = 0; - /* - * Partition tables are expected to be dropped when the parent partitioned - * table gets dropped. - */ - if (child_is_partition) - recordDependencyOn(&childobject, &parentobject, DEPENDENCY_AUTO); - else - recordDependencyOn(&childobject, &parentobject, DEPENDENCY_NORMAL); + recordDependencyOn(&childobject, &parentobject, + child_dependency_type(child_is_partition)); /* * Post creation hook of this inheritance. Since object_access_hook @@ -6904,6 +6907,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, InvalidOid, /* no predefined OID */ true, /* is_alter_table */ check_rights, + false, /* check_not_in_use - we did it already */ skip_build, quiet); @@ -11900,7 +11904,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel) drop_parent_dependency(RelationGetRelid(child_rel), RelationRelationId, - RelationGetRelid(parent_rel)); + RelationGetRelid(parent_rel), + child_dependency_type(child_is_partition)); /* * Post alter hook of this inherits. Since object_access_hook doesn't take @@ -11920,7 +11925,8 @@ RemoveInheritance(Relation child_rel, Relation parent_rel) * through pg_depend. */ static void -drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid) +drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid, + DependencyType deptype) { Relation catalogRelation; SysScanDesc scan; @@ -11952,7 +11958,7 @@ drop_parent_dependency(Oid relid, Oid refclassid, Oid refobjid) if (dep->refclassid == refclassid && dep->refobjid == refobjid && dep->refobjsubid == 0 && - dep->deptype == DEPENDENCY_NORMAL) + dep->deptype == deptype) CatalogTupleDelete(catalogRelation, &depTuple->t_self); } @@ -12073,7 +12079,8 @@ ATExecAddOf(Relation rel, const TypeName *ofTypename, LOCKMODE lockmode) /* If the table was already typed, drop the existing dependency. */ if (rel->rd_rel->reloftype) - drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype); + drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype, + DEPENDENCY_NORMAL); /* Record a dependency on the new type. */ tableobj.classId = RelationRelationId; @@ -12126,7 +12133,8 @@ ATExecDropOf(Relation rel, LOCKMODE lockmode) * table is presumed enough rights. No lock required on the type, either. */ - drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype); + drop_parent_dependency(relid, TypeRelationId, rel->rd_rel->reloftype, + DEPENDENCY_NORMAL); /* Clear pg_class.reloftype */ relationRelation = heap_open(RelationRelationId, RowExclusiveLock); @@ -14227,7 +14235,6 @@ ComputePartitionAttrs(Relation rel, List *partParams, AttrNumber *partattrs, static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) { - PartitionKey key = RelationGetPartitionKey(rel); Relation attachRel, catalog; List *childrels; @@ -14413,11 +14420,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) { int num_check = attachRel_constr->num_check; int i; - Bitmapset *not_null_attrs = NULL; - List *part_constr; - ListCell *lc; - bool partition_accepts_null = true; - int partnatts; if (attachRel_constr->has_not_null) { @@ -14447,7 +14449,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) ntest->argisrow = false; ntest->location = -1; existConstraint = lappend(existConstraint, ntest); - not_null_attrs = bms_add_member(not_null_attrs, i); } } } @@ -14481,59 +14482,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) existConstraint = list_make1(make_ands_explicit(existConstraint)); /* And away we go ... */ - if (predicate_implied_by(partConstraint, existConstraint)) + if (predicate_implied_by(partConstraint, existConstraint, true)) skip_validate = true; - - /* - * We choose to err on the safer side, i.e., give up on skipping the - * validation scan, if the partition key column doesn't have the NOT - * NULL constraint and the table is to become a list partition that - * does not accept nulls. In this case, the partition predicate - * (partConstraint) does include an 'key IS NOT NULL' expression, - * however, because of the way predicate_implied_by_simple_clause() is - * designed to handle IS NOT NULL predicates in the absence of a IS - * NOT NULL clause, we cannot rely on just the above proof. - * - * That is not an issue in case of a range partition, because if there - * were no NOT NULL constraint defined on the key columns, an error - * would be thrown before we get here anyway. That is not true, - * however, if any of the partition keys is an expression, which is - * handled below. - */ - part_constr = linitial(partConstraint); - part_constr = make_ands_implicit((Expr *) part_constr); - - /* - * part_constr contains an IS NOT NULL expression, if this is a list - * partition that does not accept nulls (in fact, also if this is a - * range partition and some partition key is an expression, but we - * never skip validation in that case anyway; see below) - */ - foreach(lc, part_constr) - { - Node *expr = lfirst(lc); - - if (IsA(expr, NullTest) && - ((NullTest *) expr)->nulltesttype == IS_NOT_NULL) - { - partition_accepts_null = false; - break; - } - } - - partnatts = get_partition_natts(key); - for (i = 0; i < partnatts; i++) - { - AttrNumber partattno; - - partattno = get_partition_col_attnum(key, i); - - /* If partition key is an expression, must not skip validation */ - if (!partition_accepts_null && - (partattno == 0 || - !bms_is_member(partattno, not_null_attrs))) - skip_validate = false; - } } /* It's safe to skip the validation scan after all */ diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index 56356de670..fc9c4f0fb1 100644 --- a/src/backend/commands/vacuumlazy.c +++ b/src/backend/commands/vacuumlazy.c @@ -1353,8 +1353,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats, "%u pages are entirely empty.\n", empty_pages), empty_pages); - appendStringInfo(&buf, _("%s."), - pg_rusage_show(&ru0)); + appendStringInfo(&buf, "%s.", pg_rusage_show(&ru0)); ereport(elevel, (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u out of %u pages", @@ -1429,8 +1428,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats) (errmsg("\"%s\": removed %d row versions in %d pages", RelationGetRelationName(onerel), tupindex, npages), - errdetail("%s.", - pg_rusage_show(&ru0)))); + errdetail_internal("%s", pg_rusage_show(&ru0)))); } /* @@ -1618,7 +1616,7 @@ lazy_vacuum_index(Relation indrel, (errmsg("scanned index \"%s\" to remove %d row versions", RelationGetRelationName(indrel), vacrelstats->num_dead_tuples), - errdetail("%s.", pg_rusage_show(&ru0)))); + errdetail_internal("%s", pg_rusage_show(&ru0)))); } /* @@ -1828,8 +1826,8 @@ lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats) (errmsg("\"%s\": truncated %u to %u pages", RelationGetRelationName(onerel), old_rel_pages, new_rel_pages), - errdetail("%s.", - pg_rusage_show(&ru0)))); + errdetail_internal("%s", + pg_rusage_show(&ru0)))); old_rel_pages = new_rel_pages; } while (new_rel_pages > vacrelstats->nonempty_pages && vacrelstats->lock_waiter_detected); diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index ed3b2484ae..717bc39c01 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -791,7 +791,7 @@ assign_client_encoding(const char *newval, void *extra) */ ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), - errmsg("cannot change client_encoding in a parallel worker"))); + errmsg("cannot change client_encoding during a parallel operation"))); } /* We do not expect an error if PrepareClientEncoding succeeded */ diff --git a/src/backend/common.mk b/src/backend/common.mk index 0b57543bc4..5d599dbd0c 100644 --- a/src/backend/common.mk +++ b/src/backend/common.mk @@ -8,8 +8,6 @@ # this directory and SUBDIRS to subdirectories containing more things # to build. -override CPPFLAGS := $(CPPFLAGS) $(ICU_CFLAGS) - ifdef PARTIAL_LINKING # old style: linking using SUBSYS.o subsysfilename = SUBSYS.o diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 7232b0911f..34cca85563 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -112,6 +112,8 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel, int maxfieldlen); static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate, Plan *planTree); +static void ExecPartitionCheck(ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, EState *estate); /* * Note that GetUpdatedColumns() also exists in commands/trigger.c. There does @@ -1404,34 +1406,19 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_projectReturning = NULL; /* - * If partition_root has been specified, that means we are building the - * ResultRelInfo for one of its leaf partitions. In that case, we need - * *not* initialize the leaf partition's constraint, but rather the - * partition_root's (if any). We must do that explicitly like this, - * because implicit partition constraints are not inherited like user- - * defined constraints and would fail to be enforced by ExecConstraints() - * after a tuple is routed to a leaf partition. + * Partition constraint, which also includes the partition constraint of + * all the ancestors that are partitions. Note that it will be checked + * even in the case of tuple-routing where this table is the target leaf + * partition, if there any BR triggers defined on the table. Although + * tuple-routing implicitly preserves the partition constraint of the + * target partition for a given row, the BR triggers may change the row + * such that the constraint is no longer satisfied, which we must fail for + * by checking it explicitly. + * + * If this is a partitioned table, the partition constraint (if any) of a + * given row will be checked just before performing tuple-routing. */ - if (partition_root) - { - /* - * Root table itself may or may not be a partition; partition_check - * would be NIL in the latter case. - */ - partition_check = RelationGetPartitionQual(partition_root); - - /* - * This is not our own partition constraint, but rather an ancestor's. - * So any Vars in it bear the ancestor's attribute numbers. We must - * switch them to our own. (dummy varno = 1) - */ - if (partition_check != NIL) - partition_check = map_partition_varattnos(partition_check, 1, - resultRelationDesc, - partition_root); - } - else - partition_check = RelationGetPartitionQual(resultRelationDesc); + partition_check = RelationGetPartitionQual(resultRelationDesc); resultRelInfo->ri_PartitionCheck = partition_check; resultRelInfo->ri_PartitionRoot = partition_root; @@ -1900,13 +1887,16 @@ ExecRelCheck(ResultRelInfo *resultRelInfo, /* * ExecPartitionCheck --- check that tuple meets the partition constraint. - * - * Note: This is called *iff* resultRelInfo is the main target table. */ -static bool +static void ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, EState *estate) { + Relation rel = resultRelInfo->ri_RelationDesc; + TupleDesc tupdesc = RelationGetDescr(rel); + Bitmapset *modifiedCols; + Bitmapset *insertedCols; + Bitmapset *updatedCols; ExprContext *econtext; /* @@ -1934,7 +1924,44 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot, * As in case of the catalogued constraints, we treat a NULL result as * success here, not a failure. */ - return ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext); + if (!ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext)) + { + char *val_desc; + Relation orig_rel = rel; + + /* See the comment above. */ + if (resultRelInfo->ri_PartitionRoot) + { + HeapTuple tuple = ExecFetchSlotTuple(slot); + TupleDesc old_tupdesc = RelationGetDescr(rel); + TupleConversionMap *map; + + rel = resultRelInfo->ri_PartitionRoot; + tupdesc = RelationGetDescr(rel); + /* a reverse map */ + map = convert_tuples_by_name(old_tupdesc, tupdesc, + gettext_noop("could not convert row type")); + if (map != NULL) + { + tuple = do_convert_tuple(tuple, map); + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + } + } + + insertedCols = GetInsertedColumns(resultRelInfo, estate); + updatedCols = GetUpdatedColumns(resultRelInfo, estate); + modifiedCols = bms_union(insertedCols, updatedCols); + val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel), + slot, + tupdesc, + modifiedCols, + 64); + ereport(ERROR, + (errcode(ERRCODE_CHECK_VIOLATION), + errmsg("new row for relation \"%s\" violates partition constraint", + RelationGetRelationName(orig_rel)), + val_desc ? errdetail("Failing row contains %s.", val_desc) : 0)); + } } /* @@ -2062,47 +2089,11 @@ ExecConstraints(ResultRelInfo *resultRelInfo, } } - if (resultRelInfo->ri_PartitionCheck && - !ExecPartitionCheck(resultRelInfo, slot, estate)) - { - char *val_desc; - Relation orig_rel = rel; - - /* See the comment above. */ - if (resultRelInfo->ri_PartitionRoot) - { - HeapTuple tuple = ExecFetchSlotTuple(slot); - TupleDesc old_tupdesc = RelationGetDescr(rel); - TupleConversionMap *map; - - rel = resultRelInfo->ri_PartitionRoot; - tupdesc = RelationGetDescr(rel); - /* a reverse map */ - map = convert_tuples_by_name(old_tupdesc, tupdesc, - gettext_noop("could not convert row type")); - if (map != NULL) - { - tuple = do_convert_tuple(tuple, map); - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - } - } - - insertedCols = GetInsertedColumns(resultRelInfo, estate); - updatedCols = GetUpdatedColumns(resultRelInfo, estate); - modifiedCols = bms_union(insertedCols, updatedCols); - val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel), - slot, - tupdesc, - modifiedCols, - 64); - ereport(ERROR, - (errcode(ERRCODE_CHECK_VIOLATION), - errmsg("new row for relation \"%s\" violates partition constraint", - RelationGetRelationName(orig_rel)), - val_desc ? errdetail("Failing row contains %s.", val_desc) : 0)); - } + if (resultRelInfo->ri_PartitionCheck) + ExecPartitionCheck(resultRelInfo, slot, estate); } + /* * ExecWithCheckOptions -- check that tuple satisfies any WITH CHECK OPTIONs * of the specified kind. @@ -3387,6 +3378,13 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd, PartitionDispatchData *failed_at; TupleTableSlot *failed_slot; + /* + * First check the root table's partition constraint, if any. No point in + * routing the tuple it if it doesn't belong in the root table itself. + */ + if (resultRelInfo->ri_PartitionCheck) + ExecPartitionCheck(resultRelInfo, slot, estate); + result = get_partition_for_tuple(pd, slot, estate, &failed_at, &failed_slot); if (result < 0) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 0610180016..1c02fa140b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -341,7 +341,7 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); else - tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE); + tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false); /* Create the queues, and become the receiver for each. */ for (i = 0; i < pcxt->nworkers; ++i) @@ -684,7 +684,7 @@ ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc) char *mqspace; shm_mq *mq; - mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); + mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false); mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; mq = (shm_mq *) mqspace; shm_mq_set_sender(mq, MyProc); @@ -705,14 +705,14 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, char *queryString; /* Get the query string from shared memory */ - queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT); + queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false); /* Reconstruct leader-supplied PlannedStmt. */ - pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); + pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false); pstmt = (PlannedStmt *) stringToNode(pstmtspace); /* Reconstruct ParamListInfo. */ - paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS); + paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS, false); paramLI = RestoreParamList(¶mspace); /* @@ -843,7 +843,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ receiver = ExecParallelGetReceiver(seg, toc); - instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION); + instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); @@ -858,7 +858,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Attach to the dynamic shared memory area. */ - area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA); + area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); /* Start up the executor */ @@ -875,7 +875,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorFinish(queryDesc); /* Report buffer usage during parallel execution. */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); + buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index f1a71e26c8..bb5c609e54 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -392,6 +392,7 @@ sql_fn_post_column_ref(ParseState *pstate, ColumnRef *cref, Node *var) param = ParseFuncOrColumn(pstate, list_make1(subfield), list_make1(param), + pstate->p_last_srf, NULL, cref->location); } diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c index c453362230..77f65db0ca 100644 --- a/src/backend/executor/nodeBitmapHeapscan.c +++ b/src/backend/executor/nodeBitmapHeapscan.c @@ -1005,7 +1005,7 @@ ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc) ParallelBitmapHeapState *pstate; Snapshot snapshot; - pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); node->pstate = pstate; snapshot = RestoreSnapshot(pstate->phs_snapshot_data); diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 5d309828ef..69e27047f1 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -194,7 +194,7 @@ ExecCustomScanInitializeWorker(CustomScanState *node, shm_toc *toc) int plan_node_id = node->ss.ps.plan->plan_node_id; void *coordinate; - coordinate = shm_toc_lookup(toc, plan_node_id); + coordinate = shm_toc_lookup(toc, plan_node_id, false); methods->InitializeWorkerCustomScan(node, toc, coordinate); } } diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 707db92178..2bb28a70ff 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -352,7 +352,7 @@ ExecForeignScanInitializeWorker(ForeignScanState *node, shm_toc *toc) int plan_node_id = node->ss.ps.plan->plan_node_id; void *coordinate; - coordinate = shm_toc_lookup(toc, plan_node_id); + coordinate = shm_toc_lookup(toc, plan_node_id, false); fdwroutine->InitializeWorkerForeignScan(node, toc, coordinate); } } diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 5550f6c0a4..fb3d3bb121 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -676,7 +676,7 @@ ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node, shm_toc *toc) { ParallelIndexScanDesc piscan; - piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); node->ioss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->ioss_RelationDesc, diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 5afd02e09d..0fb3fb5e7e 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -1714,7 +1714,7 @@ ExecIndexScanInitializeWorker(IndexScanState *node, shm_toc *toc) { ParallelIndexScanDesc piscan; - piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + piscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); node->iss_ScanDesc = index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 0ee82e3add..bdff68513b 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -415,6 +415,16 @@ ExecInsert(ModifyTableState *mtstate, else { /* + * We always check the partition constraint, including when the tuple + * got here via tuple-routing. However we don't need to in the latter + * case if no BR trigger is defined on the partition. Note that a BR + * trigger might modify the tuple such that the partition constraint + * is no longer satisfied, so we need to check in that case. + */ + bool check_partition_constr = + (resultRelInfo->ri_PartitionCheck != NIL); + + /* * Constraints might reference the tableoid column, so initialize * t_tableOid before evaluating them. */ @@ -431,9 +441,16 @@ ExecInsert(ModifyTableState *mtstate, resultRelInfo, slot, estate); /* - * Check the constraints of the tuple + * No need though if the tuple has been routed, and a BR trigger + * doesn't exist. */ - if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck) + if (saved_resultRelInfo != NULL && + !(resultRelInfo->ri_TrigDesc && + resultRelInfo->ri_TrigDesc->trig_insert_before_row)) + check_partition_constr = false; + + /* Check the constraints of the tuple */ + if (resultRelationDesc->rd_att->constr || check_partition_constr) ExecConstraints(resultRelInfo, slot, estate); if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0) @@ -1826,10 +1843,21 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (node->withCheckOptionLists != NIL && mtstate->mt_num_partitions > 0) { List *wcoList; + PlanState *plan; - Assert(operation == CMD_INSERT); - resultRelInfo = mtstate->mt_partitions; + /* + * In case of INSERT on partitioned tables, there is only one plan. + * Likewise, there is only one WITH CHECK OPTIONS list, not one per + * partition. We make a copy of the WCO qual for each partition; note + * that, if there are SubPlans in there, they all end up attached to + * the one parent Plan node. + */ + Assert(operation == CMD_INSERT && + list_length(node->withCheckOptionLists) == 1 && + mtstate->mt_nplans == 1); wcoList = linitial(node->withCheckOptionLists); + plan = mtstate->mt_plans[0]; + resultRelInfo = mtstate->mt_partitions; for (i = 0; i < mtstate->mt_num_partitions; i++) { Relation partrel = resultRelInfo->ri_RelationDesc; @@ -1843,9 +1871,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) partrel, rel); foreach(ll, mapped_wcoList) { - WithCheckOption *wco = (WithCheckOption *) lfirst(ll); - ExprState *wcoExpr = ExecInitQual((List *) wco->qual, - mtstate->mt_plans[i]); + WithCheckOption *wco = castNode(WithCheckOption, lfirst(ll)); + ExprState *wcoExpr = ExecInitQual(castNode(List, wco->qual), + plan); wcoExprs = lappend(wcoExprs, wcoExpr); } diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 5680464fa2..c0e37dcd83 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -332,7 +332,7 @@ ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc) { ParallelHeapScanDesc pscan; - pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id); + pscan = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id, false); node->ss.ss_currentScanDesc = heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); } diff --git a/src/backend/libpq/auth-scram.c b/src/backend/libpq/auth-scram.c index 99feb0ce94..a6042b8013 100644 --- a/src/backend/libpq/auth-scram.c +++ b/src/backend/libpq/auth-scram.c @@ -195,7 +195,9 @@ pg_be_scram_init(const char *username, const char *shadow_pass) * The password looked like a SCRAM verifier, but could not be * parsed. */ - elog(LOG, "invalid SCRAM verifier for user \"%s\"", username); + ereport(LOG, + (errmsg("invalid SCRAM verifier for user \"%s\"", + username))); got_verifier = false; } } @@ -283,11 +285,13 @@ pg_be_scram_exchange(void *opaq, char *input, int inputlen, if (inputlen == 0) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (empty message)")))); + errmsg("malformed SCRAM message"), + errdetail("The message is empty."))); if (inputlen != strlen(input)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (length mismatch)")))); + errmsg("malformed SCRAM message"), + errdetail("Message length does not match input length."))); switch (state->state) { @@ -319,7 +323,8 @@ pg_be_scram_exchange(void *opaq, char *input, int inputlen, if (!verify_final_nonce(state)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("invalid SCRAM response (nonce mismatch)")))); + errmsg("invalid SCRAM response"), + errdetail("Nonce does not match."))); /* * Now check the final nonce and the client proof. @@ -391,14 +396,9 @@ pg_be_scram_build_verifier(const char *password) /* Generate random salt */ if (!pg_backend_random(saltbuf, SCRAM_DEFAULT_SALT_LEN)) - { - ereport(LOG, + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not generate random salt"))); - if (prep_password) - pfree(prep_password); - return NULL; - } result = scram_build_verifier(saltbuf, SCRAM_DEFAULT_SALT_LEN, SCRAM_DEFAULT_ITERATIONS, password); @@ -435,7 +435,8 @@ scram_verify_plain_password(const char *username, const char *password, /* * The password looked like a SCRAM verifier, but could not be parsed. */ - elog(LOG, "invalid SCRAM verifier for user \"%s\"", username); + ereport(LOG, + (errmsg("invalid SCRAM verifier for user \"%s\"", username))); return false; } @@ -443,7 +444,8 @@ scram_verify_plain_password(const char *username, const char *password, saltlen = pg_b64_decode(encoded_salt, strlen(encoded_salt), salt); if (saltlen == -1) { - elog(LOG, "invalid SCRAM verifier for user \"%s\"", username); + ereport(LOG, + (errmsg("invalid SCRAM verifier for user \"%s\"", username))); return false; } @@ -582,14 +584,16 @@ read_attr_value(char **input, char attr) if (*begin != attr) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (attribute '%c' expected, %s found)", - attr, sanitize_char(*begin))))); + errmsg("malformed SCRAM message"), + errdetail("Expected attribute '%c' but found %s.", + attr, sanitize_char(*begin)))); begin++; if (*begin != '=') ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (expected = in attr %c)", attr)))); + errmsg("malformed SCRAM message"), + errdetail("Expected character = for attribute %c.", attr))); begin++; end = begin; @@ -669,8 +673,9 @@ read_any_attr(char **input, char *attr_p) (attr >= 'a' && attr <= 'z'))) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (attribute expected, invalid char %s found)", - sanitize_char(attr))))); + errmsg("malformed SCRAM message"), + errdetail("Attribute expected, but found invalid character %s.", + sanitize_char(attr)))); if (attr_p) *attr_p = attr; begin++; @@ -678,7 +683,8 @@ read_any_attr(char **input, char *attr_p) if (*begin != '=') ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (expected = in attr %c)", attr)))); + errmsg("malformed SCRAM message"), + errdetail("Expected character = for attribute %c.", attr))); begin++; end = begin; @@ -795,14 +801,16 @@ read_client_first_message(scram_state *state, char *input) default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (unexpected channel-binding flag %s)", - sanitize_char(*input))))); + errmsg("malformed SCRAM message"), + errdetail("Unexpected channel-binding flag %s.", + sanitize_char(*input)))); } if (*input != ',') ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("malformed SCRAM message (comma expected, got %s)", - sanitize_char(*input)))); + errmsg("malformed SCRAM message"), + errdetail("Comma expected, but found character %s.", + sanitize_char(*input)))); input++; /* @@ -815,8 +823,9 @@ read_client_first_message(scram_state *state, char *input) if (*input != ',') ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("malformed SCRAM message (unexpected attribute %s in client-first-message)", - sanitize_char(*input)))); + errmsg("malformed SCRAM message"), + errdetail("Unexpected attribute %s in client-first-message.", + sanitize_char(*input)))); input++; state->client_first_message_bare = pstrdup(input); @@ -831,7 +840,7 @@ read_client_first_message(scram_state *state, char *input) if (*input == 'm') ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("client requires mandatory SCRAM extension"))); + errmsg("client requires an unsupported SCRAM extension"))); /* * Read username. Note: this is ignored. We use the username from the @@ -960,7 +969,7 @@ build_server_first_message(scram_state *state) int encoded_len; if (!pg_backend_random(raw_nonce, SCRAM_RAW_NONCE_LEN)) - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("could not generate random nonce"))); @@ -1044,14 +1053,16 @@ read_client_final_message(scram_state *state, char *input) if (pg_b64_decode(value, strlen(value), client_proof) != SCRAM_KEY_LEN) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (malformed proof in client-final-message")))); + errmsg("malformed SCRAM message"), + errdetail("Malformed proof in client-final-message."))); memcpy(state->ClientProof, client_proof, SCRAM_KEY_LEN); pfree(client_proof); if (*p != '\0') ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), - (errmsg("malformed SCRAM message (garbage at end of client-final-message)")))); + errmsg("malformed SCRAM message"), + errdetail("Garbage found at the end of client-final-message."))); state->client_final_message_without_proof = palloc(proof - begin + 1); memcpy(state->client_final_message_without_proof, input, proof - begin); diff --git a/src/backend/libpq/auth.c b/src/backend/libpq/auth.c index 5b68e3b7a1..081c06a1e6 100644 --- a/src/backend/libpq/auth.c +++ b/src/backend/libpq/auth.c @@ -656,7 +656,7 @@ recv_password_packet(Port *port) * log. */ if (mtype != EOF) - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("expected password response, got message type %d", mtype))); @@ -684,7 +684,7 @@ recv_password_packet(Port *port) * StringInfo is guaranteed to have an appended '\0'. */ if (strlen(buf.data) + 1 != buf.len) - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid password packet size"))); @@ -897,11 +897,10 @@ CheckSCRAMAuth(Port *port, char *shadow_pass, char **logdetail) /* Only log error if client didn't disconnect. */ if (mtype != EOF) { - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("expected SASL response, got message type %d", mtype))); - return STATUS_ERROR; } else return STATUS_EOF; @@ -935,11 +934,9 @@ CheckSCRAMAuth(Port *port, char *shadow_pass, char **logdetail) selected_mech = pq_getmsgrawstring(&buf); if (strcmp(selected_mech, SCRAM_SHA256_NAME) != 0) { - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("client selected an invalid SASL authentication mechanism"))); - pfree(buf.data); - return STATUS_ERROR; } inputlen = pq_getmsgint(&buf, 4); @@ -1144,7 +1141,7 @@ pg_GSS_recvauth(Port *port) { /* Only log error if client didn't disconnect. */ if (mtype != EOF) - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("expected GSS response, got message type %d", mtype))); @@ -1384,7 +1381,7 @@ pg_SSPI_recvauth(Port *port) { /* Only log error if client didn't disconnect. */ if (mtype != EOF) - ereport(COMMERROR, + ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("expected SSPI response, got message type %d", mtype))); diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 96939327c3..8fbc03819d 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len) if (result != SHM_MQ_WOULD_BLOCK) break; - WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0, + WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_PUT_MESSAGE); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index fc21909ea3..a1d056ff9f 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -2480,10 +2480,11 @@ _copyRangeTblEntry(const RangeTblEntry *from) COPY_STRING_FIELD(ctename); COPY_SCALAR_FIELD(ctelevelsup); COPY_SCALAR_FIELD(self_reference); - COPY_STRING_FIELD(enrname); COPY_NODE_FIELD(coltypes); COPY_NODE_FIELD(coltypmods); COPY_NODE_FIELD(colcollations); + COPY_STRING_FIELD(enrname); + COPY_SCALAR_FIELD(enrtuples); COPY_NODE_FIELD(alias); COPY_NODE_FIELD(eref); COPY_SCALAR_FIELD(lateral); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index c644aba4c1..14a8167b04 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2664,6 +2664,8 @@ _equalRangeTblEntry(const RangeTblEntry *a, const RangeTblEntry *b) COMPARE_NODE_FIELD(coltypes); COMPARE_NODE_FIELD(coltypmods); COMPARE_NODE_FIELD(colcollations); + COMPARE_STRING_FIELD(enrname); + COMPARE_SCALAR_FIELD(enrtuples); COMPARE_NODE_FIELD(alias); COMPARE_NODE_FIELD(eref); COMPARE_SCALAR_FIELD(lateral); diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index eb3e1ce1c1..2496a9a43c 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -694,39 +694,11 @@ expression_returns_set_walker(Node *node, void *context) /* else fall through to check args */ } - /* Avoid recursion for some cases that can't return a set */ + /* Avoid recursion for some cases that parser checks not to return a set */ if (IsA(node, Aggref)) return false; if (IsA(node, WindowFunc)) return false; - if (IsA(node, DistinctExpr)) - return false; - if (IsA(node, NullIfExpr)) - return false; - if (IsA(node, ScalarArrayOpExpr)) - return false; - if (IsA(node, BoolExpr)) - return false; - if (IsA(node, SubLink)) - return false; - if (IsA(node, SubPlan)) - return false; - if (IsA(node, AlternativeSubPlan)) - return false; - if (IsA(node, ArrayExpr)) - return false; - if (IsA(node, RowExpr)) - return false; - if (IsA(node, RowCompareExpr)) - return false; - if (IsA(node, CoalesceExpr)) - return false; - if (IsA(node, MinMaxExpr)) - return false; - if (IsA(node, SQLValueFunction)) - return false; - if (IsA(node, XmlExpr)) - return false; return expression_tree_walker(node, expression_returns_set_walker, context); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index be3413436a..b56b04a82f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -4135,6 +4135,7 @@ _outRangeTblEntry(StringInfo str, const RangeTblEntry *node) break; case RTE_NAMEDTUPLESTORE: WRITE_STRING_FIELD(enrname); + WRITE_FLOAT_FIELD(enrtuples, "%.0f"); WRITE_OID_FIELD(relid); WRITE_NODE_FIELD(coltypes); WRITE_NODE_FIELD(coltypmods); @@ -4640,7 +4641,7 @@ _outPartitionElem(StringInfo str, const PartitionElem *node) static void _outPartitionSpec(StringInfo str, const PartitionSpec *node) { - WRITE_NODE_TYPE("PARTITIONBY"); + WRITE_NODE_TYPE("PARTITIONSPEC"); WRITE_STRING_FIELD(strategy); WRITE_NODE_FIELD(partParams); @@ -4650,23 +4651,23 @@ _outPartitionSpec(StringInfo str, const PartitionSpec *node) static void _outPartitionBoundSpec(StringInfo str, const PartitionBoundSpec *node) { - WRITE_NODE_TYPE("PARTITIONBOUND"); + WRITE_NODE_TYPE("PARTITIONBOUNDSPEC"); WRITE_CHAR_FIELD(strategy); WRITE_NODE_FIELD(listdatums); WRITE_NODE_FIELD(lowerdatums); WRITE_NODE_FIELD(upperdatums); - /* XXX somebody forgot location field; too late to change for v10 */ + WRITE_LOCATION_FIELD(location); } static void _outPartitionRangeDatum(StringInfo str, const PartitionRangeDatum *node) { - WRITE_NODE_TYPE("PARTRANGEDATUM"); + WRITE_NODE_TYPE("PARTITIONRANGEDATUM"); WRITE_BOOL_FIELD(infinite); WRITE_NODE_FIELD(value); - /* XXX somebody forgot location field; too late to change for v10 */ + WRITE_LOCATION_FIELD(location); } /* diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 5147eaa4d3..935bb196f7 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1991,6 +1991,7 @@ _readRangeTblEntry(void) break; case RTE_NAMEDTUPLESTORE: READ_STRING_FIELD(enrname); + READ_FLOAT_FIELD(enrtuples); READ_OID_FIELD(relid); READ_NODE_FIELD(coltypes); READ_NODE_FIELD(coltypmods); @@ -3761,8 +3762,7 @@ _readPartitionBoundSpec(void) READ_NODE_FIELD(listdatums); READ_NODE_FIELD(lowerdatums); READ_NODE_FIELD(upperdatums); - /* XXX somebody forgot location field; too late to change for v10 */ - local_node->location = -1; + READ_LOCATION_FIELD(location); READ_DONE(); } @@ -3777,8 +3777,7 @@ _readPartitionRangeDatum(void) READ_BOOL_FIELD(infinite); READ_NODE_FIELD(value); - /* XXX somebody forgot location field; too late to change for v10 */ - local_node->location = -1; + READ_LOCATION_FIELD(location); READ_DONE(); } @@ -4029,9 +4028,9 @@ parseNodeString(void) return_value = _readRemoteStmt(); else if (MATCH("SIMPLESORT", 10)) return_value = _readSimpleSort(); - else if (MATCH("PARTITIONBOUND", 14)) + else if (MATCH("PARTITIONBOUNDSPEC", 18)) return_value = _readPartitionBoundSpec(); - else if (MATCH("PARTRANGEDATUM", 14)) + else if (MATCH("PARTITIONRANGEDATUM", 19)) return_value = _readPartitionRangeDatum(); else { diff --git a/src/backend/optimizer/geqo/geqo_cx.c b/src/backend/optimizer/geqo/geqo_cx.c index 9f6d5e478a..c72081e81a 100644 --- a/src/backend/optimizer/geqo/geqo_cx.c +++ b/src/backend/optimizer/geqo/geqo_cx.c @@ -38,6 +38,7 @@ #include "optimizer/geqo_recombination.h" #include "optimizer/geqo_random.h" +#if defined(CX) /* cx * @@ -119,3 +120,5 @@ cx(PlannerInfo *root, Gene *tour1, Gene *tour2, Gene *offspring, return num_diffs; } + +#endif /* defined(CX) */ diff --git a/src/backend/optimizer/geqo/geqo_erx.c b/src/backend/optimizer/geqo/geqo_erx.c index 133fe32348..173be44409 100644 --- a/src/backend/optimizer/geqo/geqo_erx.c +++ b/src/backend/optimizer/geqo/geqo_erx.c @@ -35,6 +35,7 @@ #include "optimizer/geqo_recombination.h" #include "optimizer/geqo_random.h" +#if defined(ERX) static int gimme_edge(PlannerInfo *root, Gene gene1, Gene gene2, Edge *edge_table); static void remove_gene(PlannerInfo *root, Gene gene, Edge edge, Edge *edge_table); @@ -466,3 +467,5 @@ edge_failure(PlannerInfo *root, Gene *gene, int index, Edge *edge_table, int num elog(ERROR, "no edge found"); return 0; /* to keep the compiler quiet */ } + +#endif /* defined(ERX) */ diff --git a/src/backend/optimizer/geqo/geqo_main.c b/src/backend/optimizer/geqo/geqo_main.c index 52bd428187..86213ac5a0 100644 --- a/src/backend/optimizer/geqo/geqo_main.c +++ b/src/backend/optimizer/geqo/geqo_main.c @@ -46,14 +46,14 @@ double Geqo_seed; static int gimme_pool_size(int nr_rel); static int gimme_number_generations(int pool_size); -/* define edge recombination crossover [ERX] per default */ +/* complain if no recombination mechanism is #define'd */ #if !defined(ERX) && \ !defined(PMX) && \ !defined(CX) && \ !defined(PX) && \ !defined(OX1) && \ !defined(OX2) -#define ERX +#error "must choose one GEQO recombination mechanism in geqo.h" #endif diff --git a/src/backend/optimizer/geqo/geqo_mutation.c b/src/backend/optimizer/geqo/geqo_mutation.c index 1a06d49775..c6af00a2a7 100644 --- a/src/backend/optimizer/geqo/geqo_mutation.c +++ b/src/backend/optimizer/geqo/geqo_mutation.c @@ -35,6 +35,8 @@ #include "optimizer/geqo_mutation.h" #include "optimizer/geqo_random.h" +#if defined(CX) /* currently used only in CX mode */ + void geqo_mutation(PlannerInfo *root, Gene *tour, int num_gene) { @@ -60,3 +62,5 @@ geqo_mutation(PlannerInfo *root, Gene *tour, int num_gene) num_swaps -= 1; } } + +#endif /* defined(CX) */ diff --git a/src/backend/optimizer/geqo/geqo_ox1.c b/src/backend/optimizer/geqo/geqo_ox1.c index fbf15282ad..891cfa2403 100644 --- a/src/backend/optimizer/geqo/geqo_ox1.c +++ b/src/backend/optimizer/geqo/geqo_ox1.c @@ -37,6 +37,7 @@ #include "optimizer/geqo_random.h" #include "optimizer/geqo_recombination.h" +#if defined(OX1) /* ox1 * @@ -90,3 +91,5 @@ ox1(PlannerInfo *root, Gene *tour1, Gene *tour2, Gene *offspring, int num_gene, } } + +#endif /* defined(OX1) */ diff --git a/src/backend/optimizer/geqo/geqo_ox2.c b/src/backend/optimizer/geqo/geqo_ox2.c index 01c55bea41..b43455d3eb 100644 --- a/src/backend/optimizer/geqo/geqo_ox2.c +++ b/src/backend/optimizer/geqo/geqo_ox2.c @@ -37,6 +37,7 @@ #include "optimizer/geqo_random.h" #include "optimizer/geqo_recombination.h" +#if defined(OX2) /* ox2 * @@ -107,3 +108,5 @@ ox2(PlannerInfo *root, Gene *tour1, Gene *tour2, Gene *offspring, int num_gene, } } + +#endif /* defined(OX2) */ diff --git a/src/backend/optimizer/geqo/geqo_pmx.c b/src/backend/optimizer/geqo/geqo_pmx.c index deb0f7b353..e9485cc8b5 100644 --- a/src/backend/optimizer/geqo/geqo_pmx.c +++ b/src/backend/optimizer/geqo/geqo_pmx.c @@ -37,6 +37,7 @@ #include "optimizer/geqo_random.h" #include "optimizer/geqo_recombination.h" +#if defined(PMX) /* pmx * @@ -219,3 +220,5 @@ pmx(PlannerInfo *root, Gene *tour1, Gene *tour2, Gene *offspring, int num_gene) pfree(indx); pfree(check_list); } + +#endif /* defined(PMX) */ diff --git a/src/backend/optimizer/geqo/geqo_px.c b/src/backend/optimizer/geqo/geqo_px.c index 99289bc11f..f7f615462c 100644 --- a/src/backend/optimizer/geqo/geqo_px.c +++ b/src/backend/optimizer/geqo/geqo_px.c @@ -37,6 +37,7 @@ #include "optimizer/geqo_random.h" #include "optimizer/geqo_recombination.h" +#if defined(PX) /* px * @@ -105,3 +106,5 @@ px(PlannerInfo *root, Gene *tour1, Gene *tour2, Gene *offspring, int num_gene, } } + +#endif /* defined(PX) */ diff --git a/src/backend/optimizer/geqo/geqo_recombination.c b/src/backend/optimizer/geqo/geqo_recombination.c index ef433e54e5..a61547c16d 100644 --- a/src/backend/optimizer/geqo/geqo_recombination.c +++ b/src/backend/optimizer/geqo/geqo_recombination.c @@ -58,6 +58,9 @@ init_tour(PlannerInfo *root, Gene *tour, int num_gene) } } +/* city table is used in these recombination methods: */ +#if defined(CX) || defined(PX) || defined(OX1) || defined(OX2) + /* alloc_city_table * * allocate memory for city table @@ -85,3 +88,5 @@ free_city_table(PlannerInfo *root, City *city_table) { pfree(city_table); } + +#endif /* CX || PX || OX1 || OX2 */ diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 6e4808d51b..dfb1c973c5 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -2220,6 +2220,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, Cost inner_run_cost = workspace->inner_run_cost; Cost inner_rescan_run_cost = workspace->inner_rescan_run_cost; double outer_matched_rows; + double outer_unmatched_rows; Selectivity inner_scan_frac; /* @@ -2232,6 +2233,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, * least 1, no such clamp is needed now.) */ outer_matched_rows = rint(outer_path_rows * extra->semifactors.outer_match_frac); + outer_unmatched_rows = outer_path_rows - outer_matched_rows; inner_scan_frac = 2.0 / (extra->semifactors.match_count + 1.0); /* @@ -2275,7 +2277,7 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, * of a nonempty scan. We consider that these are all rescans, * since we used inner_run_cost once already. */ - run_cost += (outer_path_rows - outer_matched_rows) * + run_cost += outer_unmatched_rows * inner_rescan_run_cost / inner_path_rows; /* @@ -2293,20 +2295,28 @@ final_cost_nestloop(PlannerInfo *root, NestPath *path, * difficult to estimate whether that will happen (and it could * not happen if there are any unmatched outer rows!), so be * conservative and always charge the whole first-scan cost once. + * We consider this charge to correspond to the first unmatched + * outer row, unless there isn't one in our estimate, in which + * case blame it on the first matched row. */ + + /* First, count all unmatched join tuples as being processed */ + ntuples += outer_unmatched_rows * inner_path_rows; + + /* Now add the forced full scan, and decrement appropriate count */ run_cost += inner_run_cost; + if (outer_unmatched_rows >= 1) + outer_unmatched_rows -= 1; + else + outer_matched_rows -= 1; /* Add inner run cost for additional outer tuples having matches */ - if (outer_matched_rows > 1) - run_cost += (outer_matched_rows - 1) * inner_rescan_run_cost * inner_scan_frac; - - /* Add inner run cost for unmatched outer tuples */ - run_cost += (outer_path_rows - outer_matched_rows) * - inner_rescan_run_cost; + if (outer_matched_rows > 0) + run_cost += outer_matched_rows * inner_rescan_run_cost * inner_scan_frac; - /* And count the unmatched join tuples as being processed */ - ntuples += (outer_path_rows - outer_matched_rows) * - inner_path_rows; + /* Add inner run cost for additional unmatched outer tuples */ + if (outer_unmatched_rows > 0) + run_cost += outer_unmatched_rows * inner_rescan_run_cost; } } else diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c index 607a8f97bf..07ab33902b 100644 --- a/src/backend/optimizer/path/indxpath.c +++ b/src/backend/optimizer/path/indxpath.c @@ -1210,10 +1210,10 @@ build_paths_for_OR(PlannerInfo *root, RelOptInfo *rel, all_clauses = list_concat(list_copy(clauses), other_clauses); - if (!predicate_implied_by(index->indpred, all_clauses)) + if (!predicate_implied_by(index->indpred, all_clauses, false)) continue; /* can't use it at all */ - if (!predicate_implied_by(index->indpred, other_clauses)) + if (!predicate_implied_by(index->indpred, other_clauses, false)) useful_predicate = true; } } @@ -1519,7 +1519,7 @@ choose_bitmap_and(PlannerInfo *root, RelOptInfo *rel, List *paths) { Node *np = (Node *) lfirst(l); - if (predicate_implied_by(list_make1(np), qualsofar)) + if (predicate_implied_by(list_make1(np), qualsofar, false)) { redundant = true; break; /* out of inner foreach loop */ @@ -2871,7 +2871,8 @@ check_index_predicates(PlannerInfo *root, RelOptInfo *rel) continue; /* ignore non-partial indexes here */ if (!index->predOK) /* don't repeat work if already proven OK */ - index->predOK = predicate_implied_by(index->indpred, clauselist); + index->predOK = predicate_implied_by(index->indpred, clauselist, + false); /* If rel is an update target, leave indrestrictinfo as set above */ if (is_target_rel) @@ -2886,7 +2887,7 @@ check_index_predicates(PlannerInfo *root, RelOptInfo *rel) /* predicate_implied_by() assumes first arg is immutable */ if (contain_mutable_functions((Node *) rinfo->clause) || !predicate_implied_by(list_make1(rinfo->clause), - index->indpred)) + index->indpred, false)) index->indrestrictinfo = lappend(index->indrestrictinfo, rinfo); } } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index af89e9d288..5c833e933d 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -2860,7 +2860,7 @@ create_indexscan_plan(PlannerInfo *root, if (is_redundant_derived_clause(rinfo, indexquals)) continue; /* derived from same EquivalenceClass */ if (!contain_mutable_functions((Node *) rinfo->clause) && - predicate_implied_by(list_make1(rinfo->clause), indexquals)) + predicate_implied_by(list_make1(rinfo->clause), indexquals, false)) continue; /* provably implied by indexquals */ qpqual = lappend(qpqual, rinfo); } @@ -3021,7 +3021,7 @@ create_bitmap_scan_plan(PlannerInfo *root, if (rinfo->parent_ec && list_member_ptr(indexECs, rinfo->parent_ec)) continue; /* derived from same EquivalenceClass */ if (!contain_mutable_functions(clause) && - predicate_implied_by(list_make1(clause), indexquals)) + predicate_implied_by(list_make1(clause), indexquals, false)) continue; /* provably implied by indexquals */ qpqual = lappend(qpqual, rinfo); } @@ -3252,7 +3252,8 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual, * the conditions that got pushed into the bitmapqual. Avoid * generating redundant conditions. */ - if (!predicate_implied_by(list_make1(pred), ipath->indexclauses)) + if (!predicate_implied_by(list_make1(pred), ipath->indexclauses, + false)) { *qual = lappend(*qual, pred); *indexqual = lappend(*indexqual, pred); diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index aa8f6cf020..dec1589ec5 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -789,7 +789,7 @@ infer_arbiter_indexes(PlannerInfo *root) */ predExprs = RelationGetIndexPredicate(idxRel); - if (!predicate_implied_by(predExprs, (List *) onconflict->arbiterWhere)) + if (!predicate_implied_by(predExprs, (List *) onconflict->arbiterWhere, false)) goto next; results = lappend_oid(results, idxForm->indexrelid); @@ -1424,7 +1424,7 @@ relation_excluded_by_constraints(PlannerInfo *root, safe_restrictions = lappend(safe_restrictions, rinfo->clause); } - if (predicate_refuted_by(safe_restrictions, safe_restrictions)) + if (predicate_refuted_by(safe_restrictions, safe_restrictions, false)) return true; /* Only plain relations have constraints */ @@ -1463,7 +1463,7 @@ relation_excluded_by_constraints(PlannerInfo *root, * have volatile and nonvolatile subclauses, and it's OK to make * deductions with the nonvolatile parts. */ - if (predicate_refuted_by(safe_constraints, rel->baserestrictinfo)) + if (predicate_refuted_by(safe_constraints, rel->baserestrictinfo, false)) return true; return false; diff --git a/src/backend/optimizer/util/predtest.c b/src/backend/optimizer/util/predtest.c index c4a04cfa95..06fce8458c 100644 --- a/src/backend/optimizer/util/predtest.c +++ b/src/backend/optimizer/util/predtest.c @@ -77,8 +77,10 @@ typedef struct PredIterInfoData } while (0) -static bool predicate_implied_by_recurse(Node *clause, Node *predicate); -static bool predicate_refuted_by_recurse(Node *clause, Node *predicate); +static bool predicate_implied_by_recurse(Node *clause, Node *predicate, + bool clause_is_check); +static bool predicate_refuted_by_recurse(Node *clause, Node *predicate, + bool clause_is_check); static PredClass predicate_classify(Node *clause, PredIterInfo info); static void list_startup_fn(Node *clause, PredIterInfo info); static Node *list_next_fn(PredIterInfo info); @@ -90,8 +92,10 @@ static void arrayconst_cleanup_fn(PredIterInfo info); static void arrayexpr_startup_fn(Node *clause, PredIterInfo info); static Node *arrayexpr_next_fn(PredIterInfo info); static void arrayexpr_cleanup_fn(PredIterInfo info); -static bool predicate_implied_by_simple_clause(Expr *predicate, Node *clause); -static bool predicate_refuted_by_simple_clause(Expr *predicate, Node *clause); +static bool predicate_implied_by_simple_clause(Expr *predicate, Node *clause, + bool clause_is_check); +static bool predicate_refuted_by_simple_clause(Expr *predicate, Node *clause, + bool clause_is_check); static Node *extract_not_arg(Node *clause); static Node *extract_strong_not_arg(Node *clause); static bool list_member_strip(List *list, Expr *datum); @@ -107,8 +111,11 @@ static void InvalidateOprProofCacheCallBack(Datum arg, int cacheid, uint32 hashv /* * predicate_implied_by - * Recursively checks whether the clauses in restrictinfo_list imply - * that the given predicate is true. + * Recursively checks whether the clauses in clause_list imply that the + * given predicate is true. If clause_is_check is true, assume that the + * clauses in clause_list are CHECK constraints (where null is + * effectively true) rather than WHERE clauses (where null is effectively + * false). * * The top-level List structure of each list corresponds to an AND list. * We assume that eval_const_expressions() has been applied and so there @@ -125,14 +132,15 @@ static void InvalidateOprProofCacheCallBack(Datum arg, int cacheid, uint32 hashv * the plan and the time we execute the plan. */ bool -predicate_implied_by(List *predicate_list, List *restrictinfo_list) +predicate_implied_by(List *predicate_list, List *clause_list, + bool clause_is_check) { Node *p, *r; if (predicate_list == NIL) return true; /* no predicate: implication is vacuous */ - if (restrictinfo_list == NIL) + if (clause_list == NIL) return false; /* no restriction: implication must fail */ /* @@ -145,19 +153,22 @@ predicate_implied_by(List *predicate_list, List *restrictinfo_list) p = (Node *) linitial(predicate_list); else p = (Node *) predicate_list; - if (list_length(restrictinfo_list) == 1) - r = (Node *) linitial(restrictinfo_list); + if (list_length(clause_list) == 1) + r = (Node *) linitial(clause_list); else - r = (Node *) restrictinfo_list; + r = (Node *) clause_list; /* And away we go ... */ - return predicate_implied_by_recurse(r, p); + return predicate_implied_by_recurse(r, p, clause_is_check); } /* * predicate_refuted_by - * Recursively checks whether the clauses in restrictinfo_list refute - * the given predicate (that is, prove it false). + * Recursively checks whether the clauses in clause_list refute the given + * predicate (that is, prove it false). If clause_is_check is true, assume + * that the clauses in clause_list are CHECK constraints (where null is + * effectively true) rather than WHERE clauses (where null is effectively + * false). * * This is NOT the same as !(predicate_implied_by), though it is similar * in the technique and structure of the code. @@ -183,14 +194,15 @@ predicate_implied_by(List *predicate_list, List *restrictinfo_list) * time we make the plan and the time we execute the plan. */ bool -predicate_refuted_by(List *predicate_list, List *restrictinfo_list) +predicate_refuted_by(List *predicate_list, List *clause_list, + bool clause_is_check) { Node *p, *r; if (predicate_list == NIL) return false; /* no predicate: no refutation is possible */ - if (restrictinfo_list == NIL) + if (clause_list == NIL) return false; /* no restriction: refutation must fail */ /* @@ -203,13 +215,13 @@ predicate_refuted_by(List *predicate_list, List *restrictinfo_list) p = (Node *) linitial(predicate_list); else p = (Node *) predicate_list; - if (list_length(restrictinfo_list) == 1) - r = (Node *) linitial(restrictinfo_list); + if (list_length(clause_list) == 1) + r = (Node *) linitial(clause_list); else - r = (Node *) restrictinfo_list; + r = (Node *) clause_list; /* And away we go ... */ - return predicate_refuted_by_recurse(r, p); + return predicate_refuted_by_recurse(r, p, clause_is_check); } /*---------- @@ -248,7 +260,8 @@ predicate_refuted_by(List *predicate_list, List *restrictinfo_list) *---------- */ static bool -predicate_implied_by_recurse(Node *clause, Node *predicate) +predicate_implied_by_recurse(Node *clause, Node *predicate, + bool clause_is_check) { PredIterInfoData clause_info; PredIterInfoData pred_info; @@ -275,7 +288,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(pitem, predicate, pred_info) { - if (!predicate_implied_by_recurse(clause, pitem)) + if (!predicate_implied_by_recurse(clause, pitem, + clause_is_check)) { result = false; break; @@ -294,7 +308,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(pitem, predicate, pred_info) { - if (predicate_implied_by_recurse(clause, pitem)) + if (predicate_implied_by_recurse(clause, pitem, + clause_is_check)) { result = true; break; @@ -311,7 +326,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) */ iterate_begin(citem, clause, clause_info) { - if (predicate_implied_by_recurse(citem, predicate)) + if (predicate_implied_by_recurse(citem, predicate, + clause_is_check)) { result = true; break; @@ -328,7 +344,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(citem, clause, clause_info) { - if (predicate_implied_by_recurse(citem, predicate)) + if (predicate_implied_by_recurse(citem, predicate, + clause_is_check)) { result = true; break; @@ -355,7 +372,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) iterate_begin(pitem, predicate, pred_info) { - if (predicate_implied_by_recurse(citem, pitem)) + if (predicate_implied_by_recurse(citem, pitem, + clause_is_check)) { presult = true; break; @@ -382,7 +400,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(citem, clause, clause_info) { - if (!predicate_implied_by_recurse(citem, predicate)) + if (!predicate_implied_by_recurse(citem, predicate, + clause_is_check)) { result = false; break; @@ -404,7 +423,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(pitem, predicate, pred_info) { - if (!predicate_implied_by_recurse(clause, pitem)) + if (!predicate_implied_by_recurse(clause, pitem, + clause_is_check)) { result = false; break; @@ -421,7 +441,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(pitem, predicate, pred_info) { - if (predicate_implied_by_recurse(clause, pitem)) + if (predicate_implied_by_recurse(clause, pitem, + clause_is_check)) { result = true; break; @@ -437,7 +458,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) */ return predicate_implied_by_simple_clause((Expr *) predicate, - clause); + clause, + clause_is_check); } break; } @@ -478,7 +500,8 @@ predicate_implied_by_recurse(Node *clause, Node *predicate) *---------- */ static bool -predicate_refuted_by_recurse(Node *clause, Node *predicate) +predicate_refuted_by_recurse(Node *clause, Node *predicate, + bool clause_is_check) { PredIterInfoData clause_info; PredIterInfoData pred_info; @@ -508,7 +531,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(pitem, predicate, pred_info) { - if (predicate_refuted_by_recurse(clause, pitem)) + if (predicate_refuted_by_recurse(clause, pitem, + clause_is_check)) { result = true; break; @@ -525,7 +549,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) */ iterate_begin(citem, clause, clause_info) { - if (predicate_refuted_by_recurse(citem, predicate)) + if (predicate_refuted_by_recurse(citem, predicate, + clause_is_check)) { result = true; break; @@ -542,7 +567,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(pitem, predicate, pred_info) { - if (!predicate_refuted_by_recurse(clause, pitem)) + if (!predicate_refuted_by_recurse(clause, pitem, + clause_is_check)) { result = false; break; @@ -558,7 +584,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) */ not_arg = extract_not_arg(predicate); if (not_arg && - predicate_implied_by_recurse(clause, not_arg)) + predicate_implied_by_recurse(clause, not_arg, + clause_is_check)) return true; /* @@ -567,7 +594,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(citem, clause, clause_info) { - if (predicate_refuted_by_recurse(citem, predicate)) + if (predicate_refuted_by_recurse(citem, predicate, + clause_is_check)) { result = true; break; @@ -589,7 +617,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(pitem, predicate, pred_info) { - if (!predicate_refuted_by_recurse(clause, pitem)) + if (!predicate_refuted_by_recurse(clause, pitem, + clause_is_check)) { result = false; break; @@ -611,7 +640,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) iterate_begin(pitem, predicate, pred_info) { - if (predicate_refuted_by_recurse(citem, pitem)) + if (predicate_refuted_by_recurse(citem, pitem, + clause_is_check)) { presult = true; break; @@ -634,7 +664,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) */ not_arg = extract_not_arg(predicate); if (not_arg && - predicate_implied_by_recurse(clause, not_arg)) + predicate_implied_by_recurse(clause, not_arg, + clause_is_check)) return true; /* @@ -643,7 +674,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(citem, clause, clause_info) { - if (!predicate_refuted_by_recurse(citem, predicate)) + if (!predicate_refuted_by_recurse(citem, predicate, + clause_is_check)) { result = false; break; @@ -679,7 +711,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = false; iterate_begin(pitem, predicate, pred_info) { - if (predicate_refuted_by_recurse(clause, pitem)) + if (predicate_refuted_by_recurse(clause, pitem, + clause_is_check)) { result = true; break; @@ -696,7 +729,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) result = true; iterate_begin(pitem, predicate, pred_info) { - if (!predicate_refuted_by_recurse(clause, pitem)) + if (!predicate_refuted_by_recurse(clause, pitem, + clause_is_check)) { result = false; break; @@ -712,7 +746,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) */ not_arg = extract_not_arg(predicate); if (not_arg && - predicate_implied_by_recurse(clause, not_arg)) + predicate_implied_by_recurse(clause, not_arg, + clause_is_check)) return true; /* @@ -720,7 +755,8 @@ predicate_refuted_by_recurse(Node *clause, Node *predicate) */ return predicate_refuted_by_simple_clause((Expr *) predicate, - clause); + clause, + clause_is_check); } break; } @@ -1022,14 +1058,15 @@ arrayexpr_cleanup_fn(PredIterInfo info) * functions in the expression are immutable, ie dependent only on their input * arguments --- but this was checked for the predicate by the caller.) * - * When the predicate is of the form "foo IS NOT NULL", we can conclude that - * the predicate is implied if the clause is a strict operator or function - * that has "foo" as an input. In this case the clause must yield NULL when - * "foo" is NULL, which we can take as equivalent to FALSE because we know - * we are within an AND/OR subtree of a WHERE clause. (Again, "foo" is - * already known immutable, so the clause will certainly always fail.) - * Also, if the clause is just "foo" (meaning it's a boolean variable), - * the predicate is implied since the clause can't be true if "foo" is NULL. + * When clause_is_check is false, we know we are within an AND/OR + * subtree of a WHERE clause. So, if the predicate is of the form "foo IS + * NOT NULL", we can conclude that the predicate is implied if the clause is + * a strict operator or function that has "foo" as an input. In this case + * the clause must yield NULL when "foo" is NULL, which we can take as + * equivalent to FALSE given the context. (Again, "foo" is already known + * immutable, so the clause will certainly always fail.) Also, if the clause + * is just "foo" (meaning it's a boolean variable), the predicate is implied + * since the clause can't be true if "foo" is NULL. * * Finally, if both clauses are binary operator expressions, we may be able * to prove something using the system's knowledge about operators; those @@ -1037,7 +1074,8 @@ arrayexpr_cleanup_fn(PredIterInfo info) *---------- */ static bool -predicate_implied_by_simple_clause(Expr *predicate, Node *clause) +predicate_implied_by_simple_clause(Expr *predicate, Node *clause, + bool clause_is_check) { /* Allow interrupting long proof attempts */ CHECK_FOR_INTERRUPTS(); @@ -1053,7 +1091,7 @@ predicate_implied_by_simple_clause(Expr *predicate, Node *clause) Expr *nonnullarg = ((NullTest *) predicate)->arg; /* row IS NOT NULL does not act in the simple way we have in mind */ - if (!((NullTest *) predicate)->argisrow) + if (!((NullTest *) predicate)->argisrow && !clause_is_check) { if (is_opclause(clause) && list_member_strip(((OpExpr *) clause)->args, nonnullarg) && @@ -1098,7 +1136,8 @@ predicate_implied_by_simple_clause(Expr *predicate, Node *clause) *---------- */ static bool -predicate_refuted_by_simple_clause(Expr *predicate, Node *clause) +predicate_refuted_by_simple_clause(Expr *predicate, Node *clause, + bool clause_is_check) { /* Allow interrupting long proof attempts */ CHECK_FOR_INTERRUPTS(); @@ -1114,6 +1153,9 @@ predicate_refuted_by_simple_clause(Expr *predicate, Node *clause) { Expr *isnullarg = ((NullTest *) predicate)->arg; + if (clause_is_check) + return false; + /* row IS NULL does not act in the simple way we have in mind */ if (((NullTest *) predicate)->argisrow) return false; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 739ff10b07..ffa1ba6605 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9563,24 +9563,14 @@ AlterSubscriptionStmt: n->options = $6; $$ = (Node *)n; } - | ALTER SUBSCRIPTION name SET PUBLICATION publication_name_list REFRESH opt_definition - { - AlterSubscriptionStmt *n = - makeNode(AlterSubscriptionStmt); - n->kind = ALTER_SUBSCRIPTION_PUBLICATION_REFRESH; - n->subname = $3; - n->publication = $6; - n->options = $8; - $$ = (Node *)n; - } - | ALTER SUBSCRIPTION name SET PUBLICATION publication_name_list SKIP REFRESH + | ALTER SUBSCRIPTION name SET PUBLICATION publication_name_list opt_definition { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); n->kind = ALTER_SUBSCRIPTION_PUBLICATION; n->subname = $3; n->publication = $6; - n->options = NIL; + n->options = $7; $$ = (Node *)n; } | ALTER SUBSCRIPTION name ENABLE_P diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index 9fc0371cb3..a95e349562 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -712,6 +712,14 @@ check_agg_arguments_walker(Node *node, } /* Continue and descend into subtree */ } + /* We can throw error on sight for a set-returning function */ + if ((IsA(node, FuncExpr) &&((FuncExpr *) node)->funcretset) || + (IsA(node, OpExpr) &&((OpExpr *) node)->opretset)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("aggregate function calls cannot contain set-returning function calls"), + errhint("You might be able to move the set-returning function into a LATERAL FROM item."), + parser_errposition(context->pstate, exprLocation(node)))); /* We can throw error on sight for a window function */ if (IsA(node, WindowFunc)) ereport(ERROR, diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index 27dd49d301..3d5b20836f 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -572,6 +572,8 @@ transformRangeFunction(ParseState *pstate, RangeFunction *r) List *pair = (List *) lfirst(lc); Node *fexpr; List *coldeflist; + Node *newfexpr; + Node *last_srf; /* Disassemble the function-call/column-def-list pairs */ Assert(list_length(pair) == 2); @@ -618,13 +620,25 @@ transformRangeFunction(ParseState *pstate, RangeFunction *r) Node *arg = (Node *) lfirst(lc); FuncCall *newfc; + last_srf = pstate->p_last_srf; + newfc = makeFuncCall(SystemFuncName("unnest"), list_make1(arg), fc->location); - funcexprs = lappend(funcexprs, - transformExpr(pstate, (Node *) newfc, - EXPR_KIND_FROM_FUNCTION)); + newfexpr = transformExpr(pstate, (Node *) newfc, + EXPR_KIND_FROM_FUNCTION); + + /* nodeFunctionscan.c requires SRFs to be at top level */ + if (pstate->p_last_srf != last_srf && + pstate->p_last_srf != newfexpr) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-returning functions must appear at top level of FROM"), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); + + funcexprs = lappend(funcexprs, newfexpr); funcnames = lappend(funcnames, FigureColname((Node *) newfc)); @@ -638,9 +652,21 @@ transformRangeFunction(ParseState *pstate, RangeFunction *r) } /* normal case ... */ - funcexprs = lappend(funcexprs, - transformExpr(pstate, fexpr, - EXPR_KIND_FROM_FUNCTION)); + last_srf = pstate->p_last_srf; + + newfexpr = transformExpr(pstate, fexpr, + EXPR_KIND_FROM_FUNCTION); + + /* nodeFunctionscan.c requires SRFs to be at top level */ + if (pstate->p_last_srf != last_srf && + pstate->p_last_srf != newfexpr) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-returning functions must appear at top level of FROM"), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); + + funcexprs = lappend(funcexprs, newfexpr); funcnames = lappend(funcnames, FigureColname(fexpr)); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 8e2ae0e11c..958176c0ac 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -118,8 +118,7 @@ static Node *transformCurrentOfExpr(ParseState *pstate, CurrentOfExpr *cexpr); static Node *transformColumnRef(ParseState *pstate, ColumnRef *cref); static Node *transformWholeRowRef(ParseState *pstate, RangeTblEntry *rte, int location); -static Node *transformIndirection(ParseState *pstate, Node *basenode, - List *indirection); +static Node *transformIndirection(ParseState *pstate, A_Indirection *ind); static Node *transformTypeCast(ParseState *pstate, TypeCast *tc); static Node *transformCollateClause(ParseState *pstate, CollateClause *c); static Node *make_row_comparison_op(ParseState *pstate, List *opname, @@ -192,14 +191,8 @@ transformExprRecurse(ParseState *pstate, Node *expr) } case T_A_Indirection: - { - A_Indirection *ind = (A_Indirection *) expr; - - result = transformExprRecurse(pstate, ind->arg); - result = transformIndirection(pstate, result, - ind->indirection); - break; - } + result = transformIndirection(pstate, (A_Indirection *) expr); + break; case T_A_ArrayExpr: result = transformArrayExpr(pstate, (A_ArrayExpr *) expr, @@ -439,11 +432,12 @@ unknown_attribute(ParseState *pstate, Node *relref, char *attname, } static Node * -transformIndirection(ParseState *pstate, Node *basenode, List *indirection) +transformIndirection(ParseState *pstate, A_Indirection *ind) { - Node *result = basenode; + Node *last_srf = pstate->p_last_srf; + Node *result = transformExprRecurse(pstate, ind->arg); List *subscripts = NIL; - int location = exprLocation(basenode); + int location = exprLocation(result); ListCell *i; /* @@ -451,7 +445,7 @@ transformIndirection(ParseState *pstate, Node *basenode, List *indirection) * subscripting. Adjacent A_Indices nodes have to be treated as a single * multidimensional subscript operation. */ - foreach(i, indirection) + foreach(i, ind->indirection) { Node *n = lfirst(i); @@ -484,6 +478,7 @@ transformIndirection(ParseState *pstate, Node *basenode, List *indirection) newresult = ParseFuncOrColumn(pstate, list_make1(n), list_make1(result), + last_srf, NULL, location); if (newresult == NULL) @@ -632,6 +627,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) node = ParseFuncOrColumn(pstate, list_make1(makeString(colname)), list_make1(node), + pstate->p_last_srf, NULL, cref->location); } @@ -678,6 +674,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) node = ParseFuncOrColumn(pstate, list_make1(makeString(colname)), list_make1(node), + pstate->p_last_srf, NULL, cref->location); } @@ -737,6 +734,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) node = ParseFuncOrColumn(pstate, list_make1(makeString(colname)), list_make1(node), + pstate->p_last_srf, NULL, cref->location); } @@ -927,6 +925,8 @@ transformAExprOp(ParseState *pstate, A_Expr *a) else { /* Ordinary scalar operator */ + Node *last_srf = pstate->p_last_srf; + lexpr = transformExprRecurse(pstate, lexpr); rexpr = transformExprRecurse(pstate, rexpr); @@ -934,6 +934,7 @@ transformAExprOp(ParseState *pstate, A_Expr *a) a->name, lexpr, rexpr, + last_srf, a->location); } @@ -1053,6 +1054,7 @@ transformAExprNullIf(ParseState *pstate, A_Expr *a) a->name, lexpr, rexpr, + pstate->p_last_srf, a->location); /* @@ -1063,6 +1065,12 @@ transformAExprNullIf(ParseState *pstate, A_Expr *a) (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("NULLIF requires = operator to yield boolean"), parser_errposition(pstate, a->location))); + if (result->opretset) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + /* translator: %s is name of a SQL construct, eg NULLIF */ + errmsg("%s must not return a set", "NULLIF"), + parser_errposition(pstate, a->location))); /* * ... but the NullIfExpr will yield the first operand's type. @@ -1266,6 +1274,7 @@ transformAExprIn(ParseState *pstate, A_Expr *a) a->name, copyObject(lexpr), rexpr, + pstate->p_last_srf, a->location); } @@ -1430,6 +1439,7 @@ transformBoolExpr(ParseState *pstate, BoolExpr *a) static Node * transformFuncCall(ParseState *pstate, FuncCall *fn) { + Node *last_srf = pstate->p_last_srf; List *targs; ListCell *args; @@ -1465,6 +1475,7 @@ transformFuncCall(ParseState *pstate, FuncCall *fn) return ParseFuncOrColumn(pstate, fn->funcname, targs, + last_srf, fn, fn->location); } @@ -1620,7 +1631,8 @@ transformMultiAssignRef(ParseState *pstate, MultiAssignRef *maref) static Node * transformCaseExpr(ParseState *pstate, CaseExpr *c) { - CaseExpr *newc; + CaseExpr *newc = makeNode(CaseExpr); + Node *last_srf = pstate->p_last_srf; Node *arg; CaseTestExpr *placeholder; List *newargs; @@ -1629,8 +1641,6 @@ transformCaseExpr(ParseState *pstate, CaseExpr *c) Node *defresult; Oid ptype; - newc = makeNode(CaseExpr); - /* transform the test expression, if any */ arg = transformExprRecurse(pstate, (Node *) c->arg); @@ -1742,6 +1752,17 @@ transformCaseExpr(ParseState *pstate, CaseExpr *c) "CASE/WHEN"); } + /* if any subexpression contained a SRF, complain */ + if (pstate->p_last_srf != last_srf) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is name of a SQL construct, eg GROUP BY */ + errmsg("set-returning functions are not allowed in %s", + "CASE"), + errhint("You might be able to move the set-returning function into a LATERAL FROM item."), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); + newc->location = c->location; return (Node *) newc; @@ -2178,6 +2199,7 @@ static Node * transformCoalesceExpr(ParseState *pstate, CoalesceExpr *c) { CoalesceExpr *newc = makeNode(CoalesceExpr); + Node *last_srf = pstate->p_last_srf; List *newargs = NIL; List *newcoercedargs = NIL; ListCell *args; @@ -2206,6 +2228,17 @@ transformCoalesceExpr(ParseState *pstate, CoalesceExpr *c) newcoercedargs = lappend(newcoercedargs, newe); } + /* if any subexpression contained a SRF, complain */ + if (pstate->p_last_srf != last_srf) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is name of a SQL construct, eg GROUP BY */ + errmsg("set-returning functions are not allowed in %s", + "COALESCE"), + errhint("You might be able to move the set-returning function into a LATERAL FROM item."), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); + newc->args = newcoercedargs; newc->location = c->location; return (Node *) newc; @@ -2800,7 +2833,8 @@ make_row_comparison_op(ParseState *pstate, List *opname, Node *rarg = (Node *) lfirst(r); OpExpr *cmp; - cmp = castNode(OpExpr, make_op(pstate, opname, larg, rarg, location)); + cmp = castNode(OpExpr, make_op(pstate, opname, larg, rarg, + pstate->p_last_srf, location)); /* * We don't use coerce_to_boolean here because we insist on the @@ -3007,12 +3041,19 @@ make_distinct_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, { Expr *result; - result = make_op(pstate, opname, ltree, rtree, location); + result = make_op(pstate, opname, ltree, rtree, + pstate->p_last_srf, location); if (((OpExpr *) result)->opresulttype != BOOLOID) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("IS DISTINCT FROM requires = operator to yield boolean"), parser_errposition(pstate, location))); + if (((OpExpr *) result)->opretset) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + /* translator: %s is name of a SQL construct, eg NULLIF */ + errmsg("%s must not return a set", "IS DISTINCT FROM"), + parser_errposition(pstate, location))); /* * We rely on DistinctExpr and OpExpr being same struct diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index 55853c20bb..34f1cf82ee 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -64,10 +64,14 @@ static Node *ParseComplexProjection(ParseState *pstate, char *funcname, * * The argument expressions (in fargs) must have been transformed * already. However, nothing in *fn has been transformed. + * + * last_srf should be a copy of pstate->p_last_srf from just before we + * started transforming fargs. If the caller knows that fargs couldn't + * contain any SRF calls, last_srf can just be pstate->p_last_srf. */ Node * ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, - FuncCall *fn, int location) + Node *last_srf, FuncCall *fn, int location) { bool is_column = (fn == NULL); List *agg_order = (fn ? fn->agg_order : NIL); @@ -628,7 +632,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, /* if it returns a set, check that's OK */ if (retset) - check_srf_call_placement(pstate, location); + check_srf_call_placement(pstate, last_srf, location); /* build the appropriate output structure */ if (fdresult == FUNCDETAIL_NORMAL) @@ -759,6 +763,17 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, errmsg("FILTER is not implemented for non-aggregate window functions"), parser_errposition(pstate, location))); + /* + * Window functions can't either take or return sets + */ + if (pstate->p_last_srf != last_srf) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("window function calls cannot contain set-returning function calls"), + errhint("You might be able to move the set-returning function into a LATERAL FROM item."), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); + if (retset) ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), @@ -771,6 +786,10 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, retval = (Node *) wfunc; } + /* if it returns a set, remember it for error checks at higher levels */ + if (retset) + pstate->p_last_srf = retval; + return retval; } @@ -2083,9 +2102,13 @@ LookupAggWithArgs(ObjectWithArgs *agg, bool noError) * and throw a nice error if not. * * A side-effect is to set pstate->p_hasTargetSRFs true if appropriate. + * + * last_srf should be a copy of pstate->p_last_srf from just before we + * started transforming the function's arguments. This allows detection + * of whether the SRF's arguments contain any SRFs. */ void -check_srf_call_placement(ParseState *pstate, int location) +check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) { const char *err; bool errkind; @@ -2121,7 +2144,15 @@ check_srf_call_placement(ParseState *pstate, int location) errkind = true; break; case EXPR_KIND_FROM_FUNCTION: - /* okay ... but we can't check nesting here */ + /* okay, but we don't allow nested SRFs here */ + /* errmsg is chosen to match transformRangeFunction() */ + /* errposition should point to the inner SRF */ + if (pstate->p_last_srf != last_srf) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-returning functions must appear at top level of FROM"), + parser_errposition(pstate, + exprLocation(pstate->p_last_srf)))); break; case EXPR_KIND_WHERE: errkind = true; @@ -2202,7 +2233,7 @@ check_srf_call_placement(ParseState *pstate, int location) err = _("set-returning functions are not allowed in trigger WHEN conditions"); break; case EXPR_KIND_PARTITION_EXPRESSION: - err = _("set-returning functions are not allowed in partition key expression"); + err = _("set-returning functions are not allowed in partition key expressions"); break; /* diff --git a/src/backend/parser/parse_oper.c b/src/backend/parser/parse_oper.c index e40b10d4f6..4b1db76e19 100644 --- a/src/backend/parser/parse_oper.c +++ b/src/backend/parser/parse_oper.c @@ -735,12 +735,14 @@ op_error(ParseState *pstate, List *op, char oprkind, * Transform operator expression ensuring type compatibility. * This is where some type conversion happens. * - * As with coerce_type, pstate may be NULL if no special unknown-Param - * processing is wanted. + * last_srf should be a copy of pstate->p_last_srf from just before we + * started transforming the operator's arguments; this is used for nested-SRF + * detection. If the caller will throw an error anyway for a set-returning + * expression, it's okay to cheat and just pass pstate->p_last_srf. */ Expr * make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, - int location) + Node *last_srf, int location) { Oid ltypeId, rtypeId; @@ -843,7 +845,11 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, /* if it returns a set, check that's OK */ if (result->opretset) - check_srf_call_placement(pstate, location); + { + check_srf_call_placement(pstate, last_srf, location); + /* ... and remember it for error checks at higher levels */ + pstate->p_last_srf = (Node *) result; + } ReleaseSysCache(tup); diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c index c04e77775e..708188f300 100644 --- a/src/backend/parser/parse_utilcmd.c +++ b/src/backend/parser/parse_utilcmd.c @@ -486,6 +486,15 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString) return result; } +/* + * generateSerialExtraStmts + * Generate CREATE SEQUENCE and ALTER SEQUENCE ... OWNED BY statements + * to create the sequence for a serial or identity column. + * + * This includes determining the name the sequence will have. The caller + * can ask to get back the name components by passing non-null pointers + * for snamespace_p and sname_p. + */ static void generateSerialExtraStmts(CreateStmtContext *cxt, ColumnDef *column, Oid seqtypid, List *seqoptions, bool for_identity, @@ -514,7 +523,6 @@ generateSerialExtraStmts(CreateStmtContext *cxt, ColumnDef *column, * problem, especially since few people would need two serial columns in * one table. */ - foreach(option, seqoptions) { DefElem *defel = lfirst_node(DefElem, option); @@ -534,7 +542,17 @@ generateSerialExtraStmts(CreateStmtContext *cxt, ColumnDef *column, RangeVar *rv = makeRangeVarFromNameList(castNode(List, nameEl->arg)); snamespace = rv->schemaname; + if (!snamespace) + { + /* Given unqualified SEQUENCE NAME, select namespace */ + if (cxt->rel) + snamespaceid = RelationGetNamespace(cxt->rel); + else + snamespaceid = RangeVarGetCreationNamespace(cxt->relation); + snamespace = get_namespace_name(snamespaceid); + } sname = rv->relname; + /* Remove the SEQUENCE NAME item from seqoptions */ seqoptions = list_delete_ptr(seqoptions, nameEl); } else @@ -574,7 +592,9 @@ generateSerialExtraStmts(CreateStmtContext *cxt, ColumnDef *column, * not our synthetic one. */ if (seqtypid) - seqstmt->options = lcons(makeDefElem("as", (Node *) makeTypeNameFromOid(seqtypid, -1), -1), + seqstmt->options = lcons(makeDefElem("as", + (Node *) makeTypeNameFromOid(seqtypid, -1), + -1), seqstmt->options); /* diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index c3454276bf..712d700481 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) if (status == BGWH_STOPPED) break; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle) break; } - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return status; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index f6f920e493..1e511f4c1e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -3080,7 +3080,7 @@ reaper(SIGNAL_ARGS) * Waken walsenders for the last time. No regular backends * should be around anymore. */ - SignalChildren(SIGINT); + SignalChildren(SIGUSR2); pmState = PM_SHUTDOWN_2; @@ -3876,9 +3876,7 @@ PostmasterStateMachine(void) /* * If we get here, we are proceeding with normal shutdown. All * the regular children are gone, and it's time to tell the - * checkpointer to do a shutdown checkpoint. All WAL senders - * are told to switch to a stopping state so that the shutdown - * checkpoint can go ahead. + * checkpointer to do a shutdown checkpoint. */ Assert(Shutdown > NoShutdown); /* Start the checkpointer if not running */ @@ -3887,7 +3885,6 @@ PostmasterStateMachine(void) /* And tell it to shut down */ if (CheckpointerPID != 0) { - SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND); signal_child(CheckpointerPID, SIGUSR2); pmState = PM_SHUTDOWN; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ebe9c91e98..7509b4fe60 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ? WL_SOCKET_READABLE : WL_SOCKET_WRITEABLE); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_LATCH_SET | io_flag, PQsocket(conn->streamConn), @@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, /* Interrupted? */ if (rc & WL_LATCH_SET) { - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); } @@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) * the signal arrives in the middle of establishment of * replication connection. */ - ResetLatch(&MyProc->procLatch); - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_LATCH_SET, PQsocket(streamConn), 0, WAIT_EVENT_LIBPQWALRECEIVER); + + /* Emergency bailout? */ if (rc & WL_POSTMASTER_DEATH) exit(1); - /* interrupted */ + /* Interrupted? */ if (rc & WL_LATCH_SET) { + ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); - continue; } if (PQconsumeInput(streamConn) == 0) return NULL; /* trouble */ @@ -681,12 +682,25 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, { PQclear(res); - /* Verify that there are no more results */ + /* Verify that there are no more results. */ res = PQgetResult(conn->streamConn); if (res != NULL) + { + PQclear(res); + + /* + * If the other side closed the connection orderly (otherwise + * we'd seen an error, or PGRES_COPY_IN) don't report an error + * here, but let callers deal with it. + */ + if (PQstatus(conn->streamConn) == CONNECTION_BAD) + return -1; + ereport(ERROR, (errmsg("unexpected result after CommandComplete: %s", PQerrorMessage(conn->streamConn)))); + } + return -1; } else if (PQresultStatus(res) == PGRES_COPY_IN) @@ -806,11 +820,10 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, /* Make sure we got expected number of fields. */ if (nfields != nRetTypes) ereport(ERROR, - (errmsg("invalid query responser"), + (errmsg("invalid query response"), errdetail("Expected %d fields, got %d fields.", nRetTypes, nfields))); - walres->tuplestore = tuplestore_begin_heap(true, false, work_mem); /* Create tuple descriptor corresponding to expected result. */ diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index b956052014..15dac00ffa 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -80,8 +80,7 @@ static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ -volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t got_SIGTERM = false; +static volatile sig_atomic_t got_SIGHUP = false; static bool on_commit_launcher_wakeup = false; @@ -208,10 +207,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); + /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(MyLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } return; @@ -440,10 +444,8 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); - CHECK_FOR_INTERRUPTS(); - /* Wait for signal. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_STARTUP); @@ -451,7 +453,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } /* Check worker status. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -492,7 +498,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) CHECK_FOR_INTERRUPTS(); /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 1000L, WAIT_EVENT_BGWORKER_SHUTDOWN); @@ -500,7 +506,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } } } @@ -614,26 +624,18 @@ logicalrep_launcher_onexit(int code, Datum arg) static void logicalrep_worker_onexit(int code, Datum arg) { - logicalrep_worker_detach(); -} - -/* SIGTERM: set flag to exit at next convenient time */ -void -logicalrep_worker_sigterm(SIGNAL_ARGS) -{ - int save_errno = errno; + /* Disconnect gracefully from the remote side. */ + if (wrconn) + walrcv_disconnect(wrconn); - got_SIGTERM = true; - - /* Waken anything waiting on the process latch */ - SetLatch(MyLatch); + logicalrep_worker_detach(); - errno = save_errno; + ApplyLauncherWakeup(); } /* SIGHUP: set flag to reload configuration at next convenient time */ -void -logicalrep_worker_sighup(SIGNAL_ARGS) +static void +logicalrep_launcher_sighup(SIGNAL_ARGS) { int save_errno = errno; @@ -792,17 +794,14 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); + Assert(LogicalRepCtx->launcher_pid == 0); + LogicalRepCtx->launcher_pid = MyProcPid; + /* Establish signal handlers. */ - pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGHUP, logicalrep_launcher_sighup); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); - /* Make it easy to identify our processes. */ - SetConfigOption("application_name", MyBgworkerEntry->bgw_name, - PGC_USERSET, PGC_S_SESSION); - - LogicalRepCtx->launcher_pid = MyProcPid; - /* * Establish connection to nailed catalogs (we only ever access * pg_subscription). @@ -810,7 +809,7 @@ ApplyLauncherMain(Datum main_arg) BackgroundWorkerInitializeConnection(NULL, NULL); /* Enter main loop */ - while (!got_SIGTERM) + for (;;) { int rc; List *sublist; @@ -820,6 +819,8 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + CHECK_FOR_INTERRUPTS(); + now = GetCurrentTimestamp(); /* Limit the start retry to once a wal_retrieve_retry_interval */ @@ -874,7 +875,7 @@ ApplyLauncherMain(Datum main_arg) } /* Wait for more work. */ - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); @@ -883,22 +884,29 @@ ApplyLauncherMain(Datum main_arg) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); } - - ResetLatch(&MyProc->procLatch); } - LogicalRepCtx->launcher_pid = 0; - - /* ... and if it returns, we're done */ - ereport(DEBUG1, - (errmsg("logical replication launcher shutting down"))); + /* Not reachable */ +} - proc_exit(0); +/* + * Is current process the logical replication launcher? + */ +bool +IsLogicalLauncher(void) +{ + return LogicalRepCtx->launcher_pid == MyProcPid; } /* diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e65f2865dd..2bd1d9f792 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -283,7 +283,7 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) continue; attnum = logicalrep_rel_att_by_name(remoterel, - NameStr(desc->attrs[i]->attname)); + NameStr(desc->attrs[i]->attname)); entry->attrmap[i] = attnum; if (attnum >= 0) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 8848f5b4ec..e06aa0992a 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -262,7 +262,7 @@ static bool ExportInProgress = false; static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ -static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid); +static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); static void SnapBuildFreeSnapshot(Snapshot snap); @@ -463,7 +463,7 @@ SnapBuildSnapDecRefcount(Snapshot snap) * and ->subxip/subxcnt values. */ static Snapshot -SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) +SnapBuildBuildSnapshot(SnapBuild *builder) { Snapshot snapshot; Size ssize; @@ -562,7 +562,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) if (TransactionIdIsValid(MyPgXact->xmin)) elog(ERROR, "cannot build an initial slot snapshot when MyPgXact->xmin already is valid"); - snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); + snap = SnapBuildBuildSnapshot(builder); /* * We know that snap->xmin is alive, enforced by the logical xmin @@ -679,7 +679,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder, TransactionId xid) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -743,7 +743,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) { - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* increase refcount for the snapshot builder */ SnapBuildSnapIncRefcount(builder->snapshot); } @@ -1061,7 +1061,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, if (builder->snapshot) SnapBuildSnapDecRefcount(builder->snapshot); - builder->snapshot = SnapBuildBuildSnapshot(builder, xid); + builder->snapshot = SnapBuildBuildSnapshot(builder); /* we might need to execute invalidations, add snapshot */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) @@ -1831,7 +1831,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) { SnapBuildSnapDecRefcount(builder->snapshot); } - builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidTransactionId); + builder->snapshot = SnapBuildBuildSnapshot(builder); SnapBuildSnapIncRefcount(builder->snapshot); ReorderBufferSetRestartPoint(builder->reorder, lsn); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index fe45fb8820..3ff08bfb2b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -12,70 +12,72 @@ * logical replication. * * The initial data synchronization is done separately for each table, - * in separate apply worker that only fetches the initial snapshot data - * from the publisher and then synchronizes the position in stream with + * in a separate apply worker that only fetches the initial snapshot data + * from the publisher and then synchronizes the position in the stream with * the main apply worker. * - * The are several reasons for doing the synchronization this way: + * There are several reasons for doing the synchronization this way: * - It allows us to parallelize the initial data synchronization * which lowers the time needed for it to happen. * - The initial synchronization does not have to hold the xid and LSN * for the time it takes to copy data of all tables, causing less * bloat and lower disk consumption compared to doing the - * synchronization in single process for whole database. - * - It allows us to synchronize the tables added after the initial + * synchronization in a single process for the whole database. + * - It allows us to synchronize any tables added after the initial * synchronization has finished. * * The stream position synchronization works in multiple steps. - * - Sync finishes copy and sets table state as SYNCWAIT and waits - * for state to change in a loop. + * - Sync finishes copy and sets worker state as SYNCWAIT and waits for + * state to change in a loop. * - Apply periodically checks tables that are synchronizing for SYNCWAIT. - * When the desired state appears it will compare its position in the - * stream with the SYNCWAIT position and based on that changes the - * state to based on following rules: - * - if the apply is in front of the sync in the WAL stream the new - * state is set to CATCHUP and apply loops until the sync process - * catches up to the same LSN as apply - * - if the sync is in front of the apply in the WAL stream the new - * state is set to SYNCDONE - * - if both apply and sync are at the same position in the WAL stream - * the state of the table is set to READY - * - If the state was set to CATCHUP sync will read the stream and - * apply changes until it catches up to the specified stream - * position and then sets state to READY and signals apply that it - * can stop waiting and exits, if the state was set to something - * else than CATCHUP the sync process will simply end. - * - If the state was set to SYNCDONE by apply, the apply will - * continue tracking the table until it reaches the SYNCDONE stream - * position at which point it sets state to READY and stops tracking. + * When the desired state appears, it will set the worker state to + * CATCHUP and starts loop-waiting until either the table state is set + * to SYNCDONE or the sync worker exits. + * - After the sync worker has seen the state change to CATCHUP, it will + * read the stream and apply changes (acting like an apply worker) until + * it catches up to the specified stream position. Then it sets the + * state to SYNCDONE. There might be zero changes applied between + * CATCHUP and SYNCDONE, because the sync worker might be ahead of the + * apply worker. + * - Once the state was set to SYNCDONE, the apply will continue tracking + * the table until it reaches the SYNCDONE stream position, at which + * point it sets state to READY and stops tracking. Again, there might + * be zero changes in between. + * + * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> + * SYNCDONE -> READY. * * The catalog pg_subscription_rel is used to keep information about - * subscribed tables and their state and some transient state during - * data synchronization is kept in shared memory. + * subscribed tables and their state. Some transient state during data + * synchronization is kept in shared memory. The states SYNCWAIT and + * CATCHUP only appear in memory. * * Example flows look like this: * - Apply is in front: * sync:8 - * -> set SYNCWAIT + * -> set in memory SYNCWAIT * apply:10 - * -> set CATCHUP + * -> set in memory CATCHUP * -> enter wait-loop * sync:10 - * -> set READY + * -> set in catalog SYNCDONE * -> exit * apply:10 * -> exit wait-loop * -> continue rep + * apply:11 + * -> set in catalog READY * - Sync in front: * sync:10 - * -> set SYNCWAIT + * -> set in memory SYNCWAIT * apply:8 - * -> set SYNCDONE + * -> set in memory CATCHUP * -> continue per-table filtering * sync:10 + * -> set in catalog SYNCDONE * -> exit * apply:10 - * -> set READY + * -> set in catalog READY * -> stop per-table filtering * -> continue rep *------------------------------------------------------------------------- @@ -100,6 +102,7 @@ #include "replication/walreceiver.h" #include "replication/worker_internal.h" +#include "utils/snapmgr.h" #include "storage/ipc.h" #include "utils/builtins.h" @@ -130,61 +133,119 @@ finish_sync_worker(void) /* And flush all writes. */ XLogFlush(GetXLogWriteRecPtr()); - /* Find the main apply worker and signal it. */ - logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - StartTransactionCommand(); ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid)))); CommitTransactionCommand(); + /* Find the main apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + /* Stop gracefully */ - walrcv_disconnect(wrconn); proc_exit(0); } /* - * Wait until the table synchronization change. + * Wait until the relation synchronization state is set in the catalog to the + * expected one. + * + * Used when transitioning from CATCHUP state to SYNCDONE. * - * Returns false if the relation subscription state disappeared. + * Returns false if the synchronization worker has disappeared or the table state + * has been reset. */ static bool -wait_for_sync_status_change(Oid relid, char origstate) +wait_for_relation_state_change(Oid relid, char expected_state) { int rc; - char state = origstate; + char state; - while (!got_SIGTERM) + for (;;) { LogicalRepWorker *worker; + XLogRecPtr statelsn; + + CHECK_FOR_INTERRUPTS(); + + /* XXX use cache invalidation here to improve performance? */ + PushActiveSnapshot(GetLatestSnapshot()); + state = GetSubscriptionRelState(MyLogicalRepWorker->subid, + relid, &statelsn, true); + PopActiveSnapshot(); + if (state == SUBREL_STATE_UNKNOWN) + return false; + + if (state == expected_state) + return true; + + /* Check if the sync worker is still running and bail if not. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Check if the opposite worker is still running and bail if not. */ worker = logicalrep_worker_find(MyLogicalRepWorker->subid, - relid, false); + am_tablesync_worker() ? InvalidOid : relid, + false); + LWLockRelease(LogicalRepWorkerLock); if (!worker) - { - LWLockRelease(LogicalRepWorkerLock); return false; - } - state = worker->relstate; - LWLockRelease(LogicalRepWorkerLock); - if (state == SUBREL_STATE_UNKNOWN) + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(MyLatch); + } + + return false; +} + +/* + * Wait until the apply worker changes the state of our synchronization + * worker to the expected one. + * + * Used when transitioning from SYNCWAIT state to CATCHUP. + * + * Returns false if the apply worker has disappeared or the table state has been + * reset. + */ +static bool +wait_for_worker_state_change(char expected_state) +{ + int rc; + + for (;;) + { + LogicalRepWorker *worker; + + CHECK_FOR_INTERRUPTS(); + + /* Bail if the apply has died. */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + if (!worker) return false; - if (state != origstate) + if (MyLogicalRepWorker->relstate == expected_state) return true; - rc = WaitLatch(&MyProc->procLatch, + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } return false; @@ -203,10 +264,9 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) * Handle table synchronization cooperation from the synchronization * worker. * - * If the sync worker is in catch up mode and reached the predetermined - * synchronization point in the WAL stream, mark the table as READY and - * finish. If it caught up too far, set to SYNCDONE and finish. Things will - * then proceed in the "sync in front" scenario. + * If the sync worker is in CATCHUP state and reached (or passed) the + * predetermined synchronization point in the WAL stream, mark the table as + * SYNCDONE and finish. */ static void process_syncing_tables_for_sync(XLogRecPtr current_lsn) @@ -220,10 +280,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) { TimeLineID tli; - MyLogicalRepWorker->relstate = - (current_lsn == MyLogicalRepWorker->relstate_lsn) - ? SUBREL_STATE_READY - : SUBREL_STATE_SYNCDONE; + MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; MyLogicalRepWorker->relstate_lsn = current_lsn; SpinLockRelease(&MyLogicalRepWorker->relmutex); @@ -231,7 +288,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -255,17 +313,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * at least wal_retrieve_retry_interval. * * For tables that are being synchronized already, check if sync workers - * either need action from the apply worker or have finished. + * either need action from the apply worker or have finished. This is the + * SYNCWAIT to CATCHUP transition. * - * The usual scenario is that the apply got ahead of the sync while the sync - * ran, and then the action needed by apply is to mark a table for CATCHUP and - * wait for the catchup to happen. In the less common case that sync worker - * got in front of the apply worker, the table is marked as SYNCDONE but not - * ready yet, as it needs to be tracked until apply reaches the same position - * to which it was synced. - * - * If the synchronization position is reached, then the table can be marked as - * READY and is no longer tracked. + * If the synchronization position is reached (SYNCDONE), then the table can + * be marked as READY and is no longer tracked. */ static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) @@ -282,7 +334,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) Assert(!IsTransactionState()); - /* We need up to date sync state info for subscription tables here. */ + /* We need up-to-date sync state info for subscription tables here. */ if (!table_states_valid) { MemoryContext oldctx; @@ -314,7 +366,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } /* - * Prepare hash table for tracking last start times of workers, to avoid + * Prepare a hash table for tracking last start times of workers, to avoid * immediate restarts. We don't need it if there are no tables that need * syncing. */ @@ -339,7 +391,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) last_start_times = NULL; } - /* Process all tables that are being synchronized. */ + /* + * Process all tables that are being synchronized. + */ foreach(lc, table_states) { SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); @@ -348,8 +402,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) { /* * Apply has caught up to the position where the table sync has - * finished. Time to mark the table as ready so that apply will - * just continue to replicate it normally. + * finished. Mark the table as ready so that the apply will just + * continue to replicate it normally. */ if (current_lsn >= rstate->lsn) { @@ -362,7 +416,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } SetSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, true); } } else @@ -383,9 +437,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) else /* - * If no sync worker for this table yet, count running sync - * workers for this subscription, while we have the lock, for - * later. + * If there is no sync worker for this table yet, count + * running sync workers for this subscription, while we have + * the lock, for later. */ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); @@ -397,50 +451,34 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (syncworker && rstate->state == SUBREL_STATE_SYNCWAIT) { /* - * There are three possible synchronization situations here. - * - * a) Apply is in front of the table sync: We tell the table - * sync to CATCHUP. - * - * b) Apply is behind the table sync: We tell the table sync - * to mark the table as SYNCDONE and finish. - * - * c) Apply and table sync are at the same position: We tell - * table sync to mark the table as READY and finish. - * - * In any case we'll need to wait for table sync to change the - * state in catalog and only then continue ourselves. + * Tell sync worker it can catchup now. We'll wait for it so + * it does not get lost. */ - if (current_lsn > rstate->lsn) - { - rstate->state = SUBREL_STATE_CATCHUP; - rstate->lsn = current_lsn; - } - else if (current_lsn == rstate->lsn) - { - rstate->state = SUBREL_STATE_READY; - rstate->lsn = current_lsn; - } - else - rstate->state = SUBREL_STATE_SYNCDONE; - SpinLockAcquire(&syncworker->relmutex); - syncworker->relstate = rstate->state; - syncworker->relstate_lsn = rstate->lsn; + syncworker->relstate = SUBREL_STATE_CATCHUP; + syncworker->relstate_lsn = + Max(syncworker->relstate_lsn, current_lsn); SpinLockRelease(&syncworker->relmutex); /* Signal the sync worker, as it may be waiting for us. */ logicalrep_worker_wakeup_ptr(syncworker); /* - * Enter busy loop and wait for synchronization status change. + * Enter busy loop and wait for synchronization worker to + * reach expected state (or die trying). */ - wait_for_sync_status_change(rstate->relid, rstate->state); + if (!started_tx) + { + StartTransactionCommand(); + started_tx = true; + } + wait_for_relation_state_change(rstate->relid, + SUBREL_STATE_SYNCDONE); } /* * If there is no sync worker registered for the table and there - * is some free sync worker slot, start new sync worker for the + * is some free sync worker slot, start a new sync worker for the * table. */ else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) @@ -514,7 +552,7 @@ copy_read_data(void *outbuf, int minread, int maxread) int bytesread = 0; int avail; - /* If there are some leftover data from previous read, use them. */ + /* If there are some leftover data from previous read, use it. */ avail = copybuf->len - copybuf->cursor; if (avail) { @@ -526,7 +564,7 @@ copy_read_data(void *outbuf, int minread, int maxread) bytesread += avail; } - while (!got_SIGTERM && maxread > 0 && bytesread < minread) + while (maxread > 0 && bytesread < minread) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -568,7 +606,7 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); @@ -577,13 +615,9 @@ copy_read_data(void *outbuf, int minread, int maxread) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); } - /* Check for exit condition. */ - if (got_SIGTERM) - proc_exit(0); - return bytesread; } @@ -661,7 +695,7 @@ fetch_remote_table_info(char *nspname, char *relname, (errmsg("could not fetch table info for table \"%s.%s\": %s", nspname, relname, res->err))); - /* We don't know number of rows coming, so allocate enough space. */ + /* We don't know the number of rows coming, so allocate enough space. */ lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; @@ -763,7 +797,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) StartTransactionCommand(); relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, - &relstate_lsn, false); + &relstate_lsn, true); CommitTransactionCommand(); SpinLockAcquire(&MyLogicalRepWorker->relmutex); @@ -785,6 +819,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MySubscription->oid, MyLogicalRepWorker->relid); + /* + * Here we use the slot name instead of the subscription name as the + * application_name, so that it is different from the main apply worker, + * so that synchronous replication can distinguish them. + */ wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); if (wrconn == NULL) ereport(ERROR, @@ -808,28 +847,29 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); pgstat_report_stat(false); /* - * We want to do the table data sync in single transaction. + * We want to do the table data sync in a single transaction. */ StartTransactionCommand(); /* - * Use standard write lock here. It might be better to - * disallow access to table while it's being synchronized. But - * we don't want to block the main apply process from working - * and it has to open relation in RowExclusiveLock when - * remapping remote relation id to local one. + * Use a standard write lock here. It might be better to + * disallow access to the table while it's being synchronized. + * But we don't want to block the main apply process from + * working and it has to open the relation in RowExclusiveLock + * when remapping remote relation id to local one. */ rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); /* - * Create temporary slot for the sync process. We do this - * inside transaction so that we can use the snapshot made by - * the slot to get existing data. + * Create a temporary slot for the sync process. We do this + * inside the transaction so that we can use the snapshot made + * by the slot to get existing data. */ res = walrcv_exec(wrconn, "BEGIN READ ONLY ISOLATION LEVEL " @@ -844,7 +884,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Create new temporary logical decoding slot. * * We'll use slot for data copy so make sure the snapshot is - * used for the transaction, that way the COPY will get data + * used for the transaction; that way the COPY will get data * that is consistent with the lsn used by the slot to start * decoding. */ @@ -874,26 +914,43 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = *origin_startpos; SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* - * Wait for main apply worker to either tell us to catchup or - * that we are done. + /* Wait for main apply worker to tell us to catchup. */ + wait_for_worker_state_change(SUBREL_STATE_CATCHUP); + + /*---------- + * There are now two possible states here: + * a) Sync is behind the apply. If that's the case we need to + * catch up with it by consuming the logical replication + * stream up to the relstate_lsn. For that, we exit this + * function and continue in ApplyWorkerMain(). + * b) Sync is caught up with the apply. So it can just set + * the state to SYNCDONE and finish. + *---------- */ - wait_for_sync_status_change(MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate); - if (MyLogicalRepWorker->relstate != SUBREL_STATE_CATCHUP) + if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) { - /* Update the new state. */ + /* + * Update the new state in catalog. No need to bother + * with the shmem state as we are exiting for good. + */ SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + SUBREL_STATE_SYNCDONE, + *origin_startpos, + true); finish_sync_worker(); } break; } case SUBREL_STATE_SYNCDONE: case SUBREL_STATE_READY: - /* Nothing to do here but finish. */ + case SUBREL_STATE_UNKNOWN: + + /* + * Nothing to do here but finish. (UNKNOWN means the relation was + * removed from pg_subscription_rel before the sync worker could + * start.) + */ finish_sync_worker(); break; default: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c67720bd2f..97d2dff0dd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -72,6 +72,8 @@ #include "storage/proc.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" + #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/datum.h" @@ -116,7 +118,10 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); static void store_flush_position(XLogRecPtr remote_lsn); -static void reread_subscription(void); +static void maybe_reread_subscription(void); + +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; /* * Should this worker apply changes for given relation. @@ -160,8 +165,7 @@ ensure_transaction(void) StartTransactionCommand(); - if (!MySubscriptionValid) - reread_subscription(); + maybe_reread_subscription(); MemoryContextSwitchTo(ApplyMessageContext); return true; @@ -458,6 +462,12 @@ apply_handle_commit(StringInfo s) store_flush_position(commit_data.end_lsn); } + else + { + /* Process any invalidation messages that might have accumulated. */ + AcceptInvalidationMessages(); + maybe_reread_subscription(); + } in_remote_transaction = false; @@ -1005,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!got_SIGTERM) + for (;;) { pgsocket fd = PGINVALID_SOCKET; int rc; @@ -1015,6 +1025,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + CHECK_FOR_INTERRUPTS(); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1112,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) * now. */ AcceptInvalidationMessages(); - if (!MySubscriptionValid) - reread_subscription(); + maybe_reread_subscription(); /* Process any table synchronization changes. */ process_syncing_tables(last_received); @@ -1135,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* * Wait for more data or latch. */ - rc = WaitLatchOrSocket(&MyProc->procLatch, + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, fd, NAPTIME_PER_CYCLE, @@ -1145,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + if (got_SIGHUP) { got_SIGHUP = false; @@ -1198,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); } - - ResetLatch(&MyProc->procLatch); } } @@ -1295,17 +1310,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } - /* - * Reread subscription info and exit on change. + * Reread subscription info if needed. Most changes will be exit. */ static void -reread_subscription(void) +maybe_reread_subscription(void) { MemoryContext oldctx; Subscription *newsub; bool started_tx = false; + /* When cache state is valid there is nothing to do here. */ + if (MySubscriptionValid) + return; + /* This function might be called inside or outside of transaction. */ if (!IsTransactionState()) { @@ -1325,11 +1343,10 @@ reread_subscription(void) if (!newsub) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "stop because the subscription was removed", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was removed", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1340,11 +1357,10 @@ reread_subscription(void) if (!newsub->enabled) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "stop because the subscription was disabled", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "stop because the subscription was disabled", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1355,11 +1371,10 @@ reread_subscription(void) if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the connection information was changed", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because the connection information was changed", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1370,11 +1385,10 @@ reread_subscription(void) if (strcmp(newsub->name, MySubscription->name) != 0) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription was renamed", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because subscription was renamed", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1388,11 +1402,10 @@ reread_subscription(void) if (strcmp(newsub->slotname, MySubscription->slotname) != 0) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because the replication slot name was changed", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because the replication slot name was changed", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1403,11 +1416,10 @@ reread_subscription(void) if (!equal(newsub->publications, MySubscription->publications)) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will " - "restart because subscription's publications were changed", - MySubscription->name))); + (errmsg("logical replication apply worker for subscription \"%s\" will " + "restart because subscription's publications were changed", + MySubscription->name))); - walrcv_disconnect(wrconn); proc_exit(0); } @@ -1443,6 +1455,19 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue) MySubscriptionValid = false; } +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +logicalrep_worker_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} /* Logical Replication Apply worker entry point */ void @@ -1460,17 +1485,13 @@ ApplyWorkerMain(Datum main_arg) /* Setup signal handling */ pqsignal(SIGHUP, logicalrep_worker_sighup); - pqsignal(SIGTERM, logicalrep_worker_sigterm); + pqsignal(SIGTERM, die); BackgroundWorkerUnblockSignals(); /* Initialise stats to a sanish value */ MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = GetCurrentTimestamp(); - /* Make it easy to identify our processes. */ - SetConfigOption("application_name", MyBgworkerEntry->bgw_name, - PGC_USERSET, PGC_S_SESSION); - /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); @@ -1503,9 +1524,9 @@ ApplyWorkerMain(Datum main_arg) if (!MySubscription->enabled) { ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will not " + (errmsg("logical replication apply worker for subscription \"%s\" will not " "start because the subscription was disabled during startup", - MySubscription->name))); + MySubscription->name))); proc_exit(0); } @@ -1518,7 +1539,7 @@ ApplyWorkerMain(Datum main_arg) if (am_tablesync_worker()) ereport(LOG, (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", - MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)))); else ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" has started", @@ -1556,8 +1577,8 @@ ApplyWorkerMain(Datum main_arg) /* * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver - * will crash if slot is NULL. + * against DDL bugs or manual catalog changes. (libpqwalreceiver will + * crash if slot is NULL.) */ if (!myslotname) ereport(ERROR, @@ -1574,7 +1595,7 @@ ApplyWorkerMain(Datum main_arg) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); - wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname, + wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name, &err); if (wrconn == NULL) ereport(ERROR, @@ -1610,8 +1631,14 @@ ApplyWorkerMain(Datum main_arg) /* Run the main loop. */ LogicalRepApplyLoop(origin_startpos); - walrcv_disconnect(wrconn); - - /* We should only get here if we received SIGTERM */ proc_exit(0); } + +/* + * Is current process a logical replication worker? + */ +bool +IsLogicalWorker(void) +{ + return MyLogicalRepWorker != NULL; +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5386e86aa6..c0f7fbb2b2 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -331,8 +331,6 @@ ReplicationSlotAcquire(const char *name) Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); - /* Search for the named slot and mark it active if we find it. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 49cce38880..976a42f86d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -24,12 +24,15 @@ * are treated as not a crash but approximately normal termination; * the walsender will exit quickly without sending any more XLOG records. * - * If the server is shut down, postmaster sends us SIGUSR2 after all regular - * backends have exited. This causes the walsender to switch to the "stopping" - * state. In this state, the walsender will reject any replication command - * that may generate WAL activity. The checkpointer begins the shutdown + * If the server is shut down, checkpointer sends us + * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If + * the backend is idle or runs an SQL query this causes the backend to + * shutdown, if logical replication is in progress all existing WAL records + * are processed followed by a shutdown. Otherwise this causes the walsender + * to switch to the "stopping" state. In this state, the walsender will reject + * any further replication commands. The checkpointer begins the shutdown * checkpoint once all walsenders are confirmed as stopping. When the shutdown - * checkpoint finishes, the postmaster sends us SIGINT. This instructs + * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs * walsender to send any outstanding WAL, including the shutdown checkpoint * record, wait for it to be replicated to the standby, and then exit. * @@ -179,15 +182,14 @@ static bool streamingDoneReceiving; static bool WalSndCaughtUp = false; /* Flags set by signal handlers for later service in main loop */ -static volatile sig_atomic_t got_SIGHUP = false; -static volatile sig_atomic_t got_SIGINT = false; static volatile sig_atomic_t got_SIGUSR2 = false; +static volatile sig_atomic_t got_STOPPING = false; /* - * This is set while we are streaming. When not set, SIGINT signal will be - * handled like SIGTERM. When set, the main loop is responsible for checking - * got_SIGINT and terminating when it's set (after streaming any remaining - * WAL). + * This is set while we are streaming. When not set + * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, + * the main loop is responsible for checking got_STOPPING and terminating when + * it's set (after streaming any remaining WAL). */ static volatile sig_atomic_t replication_active = false; @@ -215,9 +217,6 @@ static struct } LagTracker; /* Signal handlers */ -static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndXLogSendHandler(SIGNAL_ARGS); -static void WalSndSwitchStopping(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ @@ -306,14 +305,12 @@ WalSndErrorCleanup(void) ReplicationSlotCleanup(); replication_active = false; - if (got_SIGINT) + + if (got_STOPPING || got_SIGUSR2) proc_exit(0); /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); - - if (got_SIGUSR2) - WalSndSetState(WALSNDSTATE_STOPPING); } /* @@ -686,7 +683,7 @@ StartReplication(StartReplicationCmd *cmd) WalSndLoop(XLogSendPhysical); replication_active = false; - if (got_SIGINT) + if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); @@ -1064,7 +1061,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) { ereport(LOG, (errmsg("terminating walsender process after promotion"))); - got_SIGINT = true; + got_STOPPING = true; } WalSndSetState(WALSNDSTATE_CATCHUP); @@ -1115,7 +1112,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotRelease(); replication_active = false; - if (got_SIGINT) + if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); @@ -1202,9 +1199,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ - if (got_SIGHUP) + if (ConfigReloadPending) { - got_SIGHUP = false; + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } @@ -1310,9 +1307,9 @@ WalSndWaitForWal(XLogRecPtr loc) CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ - if (got_SIGHUP) + if (ConfigReloadPending) { - got_SIGHUP = false; + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } @@ -1320,6 +1317,14 @@ WalSndWaitForWal(XLogRecPtr loc) /* Check for input from the client */ ProcessRepliesIfAny(); + /* + * If we're shutting down, trigger pending WAL to be written out, + * otherwise we'd possibly end up waiting for WAL that never gets + * written, because walwriter has shut down already. + */ + if (got_STOPPING) + XLogBackgroundFlush(); + /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); @@ -1327,14 +1332,6 @@ WalSndWaitForWal(XLogRecPtr loc) RecentFlushPtr = GetXLogReplayRecPtr(NULL); /* - * If postmaster asked us to switch to the stopping state, do so. - * Shutdown is in progress and this will allow the checkpointer to - * move on with the shutdown checkpoint. - */ - if (got_SIGUSR2) - WalSndSetState(WALSNDSTATE_STOPPING); - - /* * If postmaster asked us to stop, don't wait here anymore. This will * cause the xlogreader to return without reading a full record, which * is the fastest way to reach the mainloop which then can quit. @@ -1343,7 +1340,7 @@ WalSndWaitForWal(XLogRecPtr loc) * RecentFlushPtr, so we can send all remaining data before shutting * down. */ - if (got_SIGINT) + if (got_STOPPING) break; /* @@ -1421,7 +1418,7 @@ exec_replication_command(const char *cmd_string) * If WAL sender has been told that shutdown is getting close, switch its * status accordingly to handle the next replication commands correctly. */ - if (got_SIGUSR2) + if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); /* @@ -2102,9 +2099,9 @@ WalSndLoop(WalSndSendDataCallback send_data) CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ - if (got_SIGHUP) + if (ConfigReloadPending) { - got_SIGHUP = false; + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } @@ -2155,20 +2152,13 @@ WalSndLoop(WalSndSendDataCallback send_data) } /* - * At the reception of SIGUSR2, switch the WAL sender to the - * stopping state. - */ - if (got_SIGUSR2) - WalSndSetState(WALSNDSTATE_STOPPING); - - /* - * When SIGINT arrives, we send any outstanding logs up to the + * When SIGUSR2 arrives, we send any outstanding logs up to the * shutdown checkpoint record (i.e., the latest record), wait for * them to be replicated to the standby, and exit. This may be a * normal termination at shutdown, or a promotion, the walsender * is not sure which. */ - if (got_SIGINT) + if (got_SIGUSR2) WalSndDone(send_data); } @@ -2483,6 +2473,10 @@ XLogSendPhysical(void) XLogRecPtr endptr; Size nbytes; + /* If requested switch the WAL sender to the stopping state. */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + if (streamingDoneSending) { WalSndCaughtUp = true; @@ -2773,7 +2767,16 @@ XLogSendLogical(void) * point, then we're caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) + { WalSndCaughtUp = true; + + /* + * Have WalSndLoop() terminate the connection in an orderly + * manner, after writing out all the pending data. + */ + if (got_STOPPING) + got_SIGUSR2 = true; + } } /* Update shared memory status */ @@ -2883,51 +2886,13 @@ WalSndRqstFileReload(void) } } -/* SIGHUP: set flag to re-read config file at next convenient time */ -static void -WalSndSigHupHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_SIGHUP = true; - - SetLatch(MyLatch); - - errno = save_errno; -} - -/* SIGUSR1: set flag to send WAL records */ -static void -WalSndXLogSendHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - latch_sigusr1_handler(); - - errno = save_errno; -} - -/* SIGUSR2: set flag to switch to stopping state */ -static void -WalSndSwitchStopping(SIGNAL_ARGS) -{ - int save_errno = errno; - - got_SIGUSR2 = true; - SetLatch(MyLatch); - - errno = save_errno; -} - /* - * SIGINT: set flag to do a last cycle and shut down afterwards. The WAL - * sender should already have been switched to WALSNDSTATE_STOPPING at - * this point. + * Handle PROCSIG_WALSND_INIT_STOPPING signal. */ -static void -WalSndLastCycleHandler(SIGNAL_ARGS) +void +HandleWalSndInitStopping(void) { - int save_errno = errno; + Assert(am_walsender); /* * If replication has not yet started, die like with SIGTERM. If @@ -2937,8 +2902,21 @@ WalSndLastCycleHandler(SIGNAL_ARGS) */ if (!replication_active) kill(MyProcPid, SIGTERM); + else + got_STOPPING = true; +} + +/* + * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL + * sender should already have been switched to WALSNDSTATE_STOPPING at + * this point. + */ +static void +WalSndLastCycleHandler(SIGNAL_ARGS) +{ + int save_errno = errno; - got_SIGINT = true; + got_SIGUSR2 = true; SetLatch(MyLatch); errno = save_errno; @@ -2949,16 +2927,16 @@ void WalSndSignals(void) { /* Set up signal handlers */ - pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config + pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config * file */ - pqsignal(SIGINT, WalSndLastCycleHandler); /* request a last cycle and - * shutdown */ + pqsignal(SIGINT, StatementCancelHandler); /* query cancel */ pqsignal(SIGTERM, die); /* request shutdown */ pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ - pqsignal(SIGUSR2, WalSndSwitchStopping); /* switch to stopping state */ + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and + * shutdown */ /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); @@ -3037,9 +3015,36 @@ WalSndWakeup(void) } /* - * Wait that all the WAL senders have reached the stopping state. This is - * used by the checkpointer to control when shutdown checkpoints can - * safely begin. + * Signal all walsenders to move to stopping state. + * + * This will trigger walsenders to move to a state where no further WAL can be + * generated. See this file's header for details. + */ +void +WalSndInitStopping(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + pid_t pid; + + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + SpinLockRelease(&walsnd->mutex); + + if (pid == 0) + continue; + + SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId); + } +} + +/* + * Wait that all the WAL senders have quit or reached the stopping state. This + * is used by the checkpointer to control when the shutdown checkpoint can + * safely be performed. */ void WalSndWaitStopping(void) diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 510f49fcc0..c5f6a93e80 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1987,7 +1987,8 @@ fireRIRrules(Query *parsetree, List *activeRIRs, bool forUpdatePushedDown) /* Only normal relations can have RLS policies */ if (rte->rtekind != RTE_RELATION || - rte->relkind != RELKIND_RELATION) + (rte->relkind != RELKIND_RELATION && + rte->relkind != RELKIND_PARTITIONED_TABLE)) continue; rel = heap_open(rte->relid, NoLock); @@ -2605,7 +2606,8 @@ relation_is_updatable(Oid reloid, return 0; /* If the relation is a table, it is always updatable */ - if (rel->rd_rel->relkind == RELKIND_RELATION) + if (rel->rd_rel->relkind == RELKIND_RELATION || + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { relation_close(rel, AccessShareLock); return ALL_EVENTS; @@ -2719,7 +2721,8 @@ relation_is_updatable(Oid reloid, base_rte = rt_fetch(rtr->rtindex, viewquery->rtable); Assert(base_rte->rtekind == RTE_RELATION); - if (base_rte->relkind != RELKIND_RELATION) + if (base_rte->relkind != RELKIND_RELATION && + base_rte->relkind != RELKIND_PARTITIONED_TABLE) { baseoid = base_rte->relid; include_cols = adjust_view_column_set(updatable_cols, diff --git a/src/backend/snowball/Makefile b/src/backend/snowball/Makefile index 518178ff39..50cbace41d 100644 --- a/src/backend/snowball/Makefile +++ b/src/backend/snowball/Makefile @@ -14,8 +14,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(top_srcdir)/src/include/snowball \ - -I$(top_srcdir)/src/include/snowball/libstemmer $(CPPFLAGS) \ - $(ICU_CFLAGS) + -I$(top_srcdir)/src/include/snowball/libstemmer $(CPPFLAGS) OBJS= $(WIN32RES) dict_snowball.o api.o utilities.o \ stem_ISO_8859_1_danish.o \ diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c index 53e6bf2477..55959de91f 100644 --- a/src/backend/storage/ipc/latch.c +++ b/src/backend/storage/ipc/latch.c @@ -370,7 +370,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET, (Latch *) latch, NULL); - if (wakeEvents & WL_POSTMASTER_DEATH) + if (wakeEvents & WL_POSTMASTER_DEATH && IsUnderPostmaster) AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 1c01dd973f..0e0bbf71f0 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1990,14 +1990,15 @@ GetSnapshotData(Snapshot snapshot, bool latest) * Returns TRUE if successful, FALSE if source xact is no longer running. */ bool -ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) +ProcArrayInstallImportedXmin(TransactionId xmin, + VirtualTransactionId *sourcevxid) { bool result = false; ProcArrayStruct *arrayP = procArray; int index; Assert(TransactionIdIsNormal(xmin)); - if (!TransactionIdIsNormal(sourcexid)) + if (!sourcevxid) return false; /* Get lock so source xact can't end while we're doing this */ @@ -2014,8 +2015,10 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) if (pgxact->vacuumFlags & PROC_IN_VACUUM) continue; - xid = pgxact->xid; /* fetch just once */ - if (xid != sourcexid) + /* We are only interested in the specific virtual transaction. */ + if (proc->backendId != sourcevxid->backendId) + continue; + if (proc->lxid != sourcevxid->localTransactionId) continue; /* diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index f4d4f25e68..55e94249db 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -21,6 +21,7 @@ #include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" +#include "replication/walsender.h" #include "storage/latch.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -280,6 +281,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE)) HandleParallelMessageInterrupt(); + if (CheckProcSignal(PROCSIG_WALSND_INIT_STOPPING)) + HandleWalSndInitStopping(); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f5bf807cd6..fcd6cc7a8c 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -769,7 +769,7 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) * * The purpose of this function is to make sure that the process * with which we're communicating doesn't block forever waiting for us to - * fill or drain the queue once we've lost interest. Whem the sender + * fill or drain the queue once we've lost interest. When the sender * detaches, the receiver can read any messages remaining in the queue; * further reads will return SHM_MQ_DETACHED. If the receiver detaches, * further attempts to send messages will likewise return SHM_MQ_DETACHED. @@ -1167,7 +1167,7 @@ shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n) } /* - * Set sender's latch, unless queue is detached. + * Set receiver's latch, unless queue is detached. */ static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq) diff --git a/src/backend/storage/ipc/shm_toc.c b/src/backend/storage/ipc/shm_toc.c index 9110ffa4a0..50334cd797 100644 --- a/src/backend/storage/ipc/shm_toc.c +++ b/src/backend/storage/ipc/shm_toc.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * src/include/storage/shm_toc.c + * src/backend/storage/ipc/shm_toc.c * *------------------------------------------------------------------------- */ @@ -20,16 +20,16 @@ typedef struct shm_toc_entry { uint64 key; /* Arbitrary identifier */ - uint64 offset; /* Bytes offset */ + Size offset; /* Offset, in bytes, from TOC start */ } shm_toc_entry; struct shm_toc { - uint64 toc_magic; /* Magic number for this TOC */ + uint64 toc_magic; /* Magic number identifying this TOC */ slock_t toc_mutex; /* Spinlock for mutual exclusion */ Size toc_total_bytes; /* Bytes managed by this TOC */ Size toc_allocated_bytes; /* Bytes allocated of those managed */ - Size toc_nentry; /* Number of entries in TOC */ + uint32 toc_nentry; /* Number of entries in TOC */ shm_toc_entry toc_entry[FLEXIBLE_ARRAY_MEMBER]; }; @@ -53,7 +53,7 @@ shm_toc_create(uint64 magic, void *address, Size nbytes) /* * Attach to an existing table of contents. If the magic number found at - * the target address doesn't match our expectations, returns NULL. + * the target address doesn't match our expectations, return NULL. */ extern shm_toc * shm_toc_attach(uint64 magic, void *address) @@ -64,7 +64,7 @@ shm_toc_attach(uint64 magic, void *address) return NULL; Assert(toc->toc_total_bytes >= toc->toc_allocated_bytes); - Assert(toc->toc_total_bytes >= offsetof(shm_toc, toc_entry)); + Assert(toc->toc_total_bytes > offsetof(shm_toc, toc_entry)); return toc; } @@ -76,7 +76,7 @@ shm_toc_attach(uint64 magic, void *address) * just a way of dividing a single physical shared memory segment into logical * chunks that may be used for different purposes. * - * We allocated backwards from the end of the segment, so that the TOC entries + * We allocate backwards from the end of the segment, so that the TOC entries * can grow forward from the start of the segment. */ extern void * @@ -140,7 +140,7 @@ shm_toc_freespace(shm_toc *toc) /* * Insert a TOC entry. * - * The idea here is that process setting up the shared memory segment will + * The idea here is that the process setting up the shared memory segment will * register the addresses of data structures within the segment using this * function. Each data structure will be identified using a 64-bit key, which * is assumed to be a well-known or discoverable integer. Other processes @@ -155,17 +155,17 @@ shm_toc_freespace(shm_toc *toc) * data structure here. But the real idea here is just to give someone mapping * a dynamic shared memory the ability to find the bare minimum number of * pointers that they need to bootstrap. If you're storing a lot of stuff in - * here, you're doing it wrong. + * the TOC, you're doing it wrong. */ void shm_toc_insert(shm_toc *toc, uint64 key, void *address) { volatile shm_toc *vtoc = toc; - uint64 total_bytes; - uint64 allocated_bytes; - uint64 nentry; - uint64 toc_bytes; - uint64 offset; + Size total_bytes; + Size allocated_bytes; + Size nentry; + Size toc_bytes; + Size offset; /* Relativize pointer. */ Assert(address > (void *) toc); @@ -181,7 +181,8 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) /* Check for memory exhaustion and overflow. */ if (toc_bytes + sizeof(shm_toc_entry) > total_bytes || - toc_bytes + sizeof(shm_toc_entry) < toc_bytes) + toc_bytes + sizeof(shm_toc_entry) < toc_bytes || + nentry >= PG_UINT32_MAX) { SpinLockRelease(&toc->toc_mutex); ereport(ERROR, @@ -208,6 +209,9 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) /* * Look up a TOC entry. * + * If the key is not found, returns NULL if noError is true, otherwise + * throws elog(ERROR). + * * Unlike the other functions in this file, this operation acquires no lock; * it uses only barriers. It probably wouldn't hurt concurrency very much even * if it did get a lock, but since it's reasonably likely that a group of @@ -215,21 +219,29 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) * right around the same time, there seems to be some value in avoiding it. */ void * -shm_toc_lookup(shm_toc *toc, uint64 key) +shm_toc_lookup(shm_toc *toc, uint64 key, bool noError) { - uint64 nentry; - uint64 i; + uint32 nentry; + uint32 i; - /* Read the number of entries before we examine any entry. */ + /* + * Read the number of entries before we examine any entry. We assume that + * reading a uint32 is atomic. + */ nentry = toc->toc_nentry; pg_read_barrier(); /* Now search for a matching entry. */ for (i = 0; i < nentry; ++i) + { if (toc->toc_entry[i].key == key) return ((char *) toc) + toc->toc_entry[i].offset; + } /* No matching entry was found. */ + if (!noError) + elog(ERROR, "could not find key " UINT64_FORMAT " in shm TOC at %p", + key, toc); return NULL; } diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 5afb21121b..b4b7d28dd5 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) { cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1); AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET, - &MyProc->procLatch, NULL); + MyLatch, NULL); } /* * Reset my latch before adding myself to the queue and before entering * the caller's predicate loop. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* Add myself to the wait queue. */ SpinLockAcquire(&cv->mutex); @@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info); /* Reset latch before testing whether we can return. */ - ResetLatch(&MyProc->procLatch); + ResetLatch(MyLatch); /* * If this process has been taken out of the wait list, then we know diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 30aea14385..38c2e493a5 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -148,7 +148,7 @@ * predicate lock maintenance * GetSerializableTransactionSnapshot(Snapshot snapshot) * SetSerializableTransactionSnapshot(Snapshot snapshot, - * TransactionId sourcexid) + * VirtualTransactionId *sourcevxid) * RegisterPredicateLockingXid(void) * PredicateLockRelation(Relation relation, Snapshot snapshot) * PredicateLockPage(Relation relation, BlockNumber blkno, @@ -434,7 +434,8 @@ static uint32 predicatelock_hash(const void *key, Size keysize); static void SummarizeOldestCommittedSxact(void); static Snapshot GetSafeSnapshot(Snapshot snapshot); static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid); + VirtualTransactionId *sourcevxid, + int sourcepid); static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag); static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent); @@ -1510,7 +1511,7 @@ GetSafeSnapshot(Snapshot origSnapshot) * one passed to it, but we avoid assuming that here. */ snapshot = GetSerializableTransactionSnapshotInt(origSnapshot, - InvalidTransactionId); + NULL, InvalidPid); if (MySerializableXact == InvalidSerializableXact) return snapshot; /* no concurrent r/w xacts; it's safe */ @@ -1643,7 +1644,7 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) return GetSafeSnapshot(snapshot); return GetSerializableTransactionSnapshotInt(snapshot, - InvalidTransactionId); + NULL, InvalidPid); } /* @@ -1658,7 +1659,8 @@ GetSerializableTransactionSnapshot(Snapshot snapshot) */ void SetSerializableTransactionSnapshot(Snapshot snapshot, - TransactionId sourcexid) + VirtualTransactionId *sourcevxid, + int sourcepid) { Assert(IsolationIsSerializable()); @@ -1673,7 +1675,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("a snapshot-importing transaction must not be READ ONLY DEFERRABLE"))); - (void) GetSerializableTransactionSnapshotInt(snapshot, sourcexid); + (void) GetSerializableTransactionSnapshotInt(snapshot, sourcevxid, + sourcepid); } /* @@ -1687,7 +1690,8 @@ SetSerializableTransactionSnapshot(Snapshot snapshot, */ static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, - TransactionId sourcexid) + VirtualTransactionId *sourcevxid, + int sourcepid) { PGPROC *proc; VirtualTransactionId vxid; @@ -1741,17 +1745,17 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, } while (!sxact); /* Get the snapshot, or check that it's safe to use */ - if (!TransactionIdIsValid(sourcexid)) + if (!sourcevxid) snapshot = GetSnapshotData(snapshot, false); - else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcexid)) + else if (!ProcArrayInstallImportedXmin(snapshot->xmin, sourcevxid)) { ReleasePredXact(sxact); LWLockRelease(SerializableXactHashLock); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), - errdetail("The source transaction %u is not running anymore.", - sourcexid))); + errdetail("The source process with pid %d is not running anymore.", + sourcepid))); } /* @@ -2841,7 +2845,7 @@ exit: /* We shouldn't run out of memory if we're moving locks */ Assert(!outOfShmem); - /* Put the scrach entry back */ + /* Put the scratch entry back */ RestoreScratchTarget(false); } diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index fdf045a45b..1b53d651cd 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -902,8 +902,8 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) offset != MAXALIGN(offset)) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("corrupted item pointer: offset = %u, size = %u", - offset, (unsigned int) size))); + errmsg("corrupted item pointer: offset = %u, length = %u", + offset, (unsigned int) size))); if (nextitm < nitems && offnum == itemnos[nextitm]) { diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 3cc070a34b..8c7beaf201 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -55,6 +55,8 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -141,13 +143,6 @@ char *register_stack_base_ptr = NULL; #endif /* - * Flag to mark SIGHUP. Whenever the main loop comes around it - * will reread the configuration file. (Better than doing the - * reading in the signal handler, ey?) - */ -static volatile sig_atomic_t got_SIGHUP = false; - -/* * Flag to keep track of whether we have started a transaction. * For extended query protocol this has to be remembered across messages. */ @@ -205,7 +200,6 @@ static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *pstmts); static bool IsTransactionStmtList(List *pstmts); static void drop_unnamed_stmt(void); -static void SigHupHandler(SIGNAL_ARGS); static void log_disconnections(int code, Datum arg); @@ -3098,13 +3092,19 @@ FloatExceptionHandler(SIGNAL_ARGS) "invalid operation, such as division by zero."))); } -/* SIGHUP: set flag to re-read config file at next convenient time */ -static void -SigHupHandler(SIGNAL_ARGS) +/* + * SIGHUP: set flag to re-read config file at next convenient time. + * + * Sets the ConfigReloadPending flag, which should be checked at convenient + * places inside main loops. (Better than doing the reading in the signal + * handler, ey?) + */ +void +PostgresSigHupHandler(SIGNAL_ARGS) { int save_errno = errno; - got_SIGHUP = true; + ConfigReloadPending = true; SetLatch(MyLatch); errno = save_errno; @@ -3260,6 +3260,18 @@ ProcessInterrupts(void) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("terminating autovacuum process due to administrator command"))); + else if (IsLogicalWorker()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating logical replication worker due to administrator command"))); + else if (IsLogicalLauncher()) + { + ereport(DEBUG1, + (errmsg("logical replication launcher shutting down"))); + + /* The logical replication launcher can be stopped at any time. */ + proc_exit(0); + } else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); @@ -4114,8 +4126,8 @@ PostgresMain(int argc, char *argv[], WalSndSignals(); else { - pqsignal(SIGHUP, SigHupHandler); /* set flag to read config - * file */ + pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read + * config file */ pqsignal(SIGINT, StatementCancelHandler); /* cancel current query */ pqsignal(SIGTERM, die); /* cancel current query and exit */ @@ -4620,9 +4632,9 @@ PostgresMain(int argc, char *argv[], * (6) check for any other interesting events that happened while we * slept. */ - if (got_SIGHUP) + if (ConfigReloadPending) { - got_SIGHUP = false; + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 632d51f3ac..d3eba7bcdf 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -2019,6 +2019,7 @@ ProcessUtilitySlow(ParseState *pstate, InvalidOid, /* no predefined OID */ false, /* is_alter_table */ true, /* check_rights */ + true, /* check_not_in_use */ false, /* skip_build */ false); /* quiet */ diff --git a/src/backend/utils/adt/json.c b/src/backend/utils/adt/json.c index 47371ab7cb..0f99b613f5 100644 --- a/src/backend/utils/adt/json.c +++ b/src/backend/utils/adt/json.c @@ -2008,7 +2008,7 @@ json_object_agg_transfn(PG_FUNCTION_ARGS) if (arg_type == InvalidOid) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not determine data type for argument 1"))); + errmsg("could not determine data type for argument %d", 1))); json_categorize_type(arg_type, &state->key_category, &state->key_output_func); @@ -2018,7 +2018,7 @@ json_object_agg_transfn(PG_FUNCTION_ARGS) if (arg_type == InvalidOid) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not determine data type for argument 2"))); + errmsg("could not determine data type for argument %d", 2))); json_categorize_type(arg_type, &state->val_category, &state->val_output_func); diff --git a/src/backend/utils/adt/jsonb.c b/src/backend/utils/adt/jsonb.c index 1dabfa9fc1..c588ce00af 100644 --- a/src/backend/utils/adt/jsonb.c +++ b/src/backend/utils/adt/jsonb.c @@ -1212,7 +1212,7 @@ jsonb_build_object(PG_FUNCTION_ARGS) if (val_type == InvalidOid || val_type == UNKNOWNOID) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("argument %d: could not determine data type", i + 1))); + errmsg("could not determine data type for argument %d", i + 1))); add_jsonb(arg, false, &result, val_type, true); @@ -1235,7 +1235,7 @@ jsonb_build_object(PG_FUNCTION_ARGS) if (val_type == InvalidOid || val_type == UNKNOWNOID) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("argument %d: could not determine data type", i + 2))); + errmsg("could not determine data type for argument %d", i + 2))); add_jsonb(arg, PG_ARGISNULL(i + 1), &result, val_type, false); } @@ -1295,7 +1295,7 @@ jsonb_build_array(PG_FUNCTION_ARGS) if (val_type == InvalidOid || val_type == UNKNOWNOID) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("argument %d: could not determine data type", i + 1))); + errmsg("could not determine data type for argument %d", i + 1))); add_jsonb(arg, PG_ARGISNULL(i), &result, val_type, false); } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 2820dbe465..f5631e512e 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -1879,7 +1879,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, heap_close(relation, AccessShareLock); return NULL; } - elog(ERROR, "cache lookup failed for constraint %u", constraintId); + elog(ERROR, "could not find tuple for constraint %u", constraintId); } conForm = (Form_pg_constraint) GETSTRUCT(tup); diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c index 94b2de5e2d..2967f179ad 100644 --- a/src/backend/utils/adt/selfuncs.c +++ b/src/backend/utils/adt/selfuncs.c @@ -154,12 +154,13 @@ get_relation_stats_hook_type get_relation_stats_hook = NULL; get_index_stats_hook_type get_index_stats_hook = NULL; +static double eqsel_internal(PG_FUNCTION_ARGS, bool negate); static double var_eq_const(VariableStatData *vardata, Oid operator, Datum constval, bool constisnull, - bool varonleft); + bool varonleft, bool negate); static double var_eq_non_const(VariableStatData *vardata, Oid operator, Node *other, - bool varonleft); + bool varonleft, bool negate); static double ineq_histogram_selectivity(PlannerInfo *root, VariableStatData *vardata, FmgrInfo *opproc, bool isgt, @@ -227,6 +228,15 @@ static List *add_predicate_to_quals(IndexOptInfo *index, List *indexQuals); Datum eqsel(PG_FUNCTION_ARGS) { + PG_RETURN_FLOAT8((float8) eqsel_internal(fcinfo, false)); +} + +/* + * Common code for eqsel() and neqsel() + */ +static double +eqsel_internal(PG_FUNCTION_ARGS, bool negate) +{ PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0); Oid operator = PG_GETARG_OID(1); List *args = (List *) PG_GETARG_POINTER(2); @@ -237,12 +247,26 @@ eqsel(PG_FUNCTION_ARGS) double selec; /* + * When asked about <>, we do the estimation using the corresponding = + * operator, then convert to <> via "1.0 - eq_selectivity - nullfrac". + */ + if (negate) + { + operator = get_negator(operator); + if (!OidIsValid(operator)) + { + /* Use default selectivity (should we raise an error instead?) */ + return 1.0 - DEFAULT_EQ_SEL; + } + } + + /* * If expression is not variable = something or something = variable, then * punt and return a default estimate. */ if (!get_restriction_variable(root, args, varRelid, &vardata, &other, &varonleft)) - PG_RETURN_FLOAT8(DEFAULT_EQ_SEL); + return negate ? (1.0 - DEFAULT_EQ_SEL) : DEFAULT_EQ_SEL; /* * We can do a lot better if the something is a constant. (Note: the @@ -253,14 +277,14 @@ eqsel(PG_FUNCTION_ARGS) selec = var_eq_const(&vardata, operator, ((Const *) other)->constvalue, ((Const *) other)->constisnull, - varonleft); + varonleft, negate); else selec = var_eq_non_const(&vardata, operator, other, - varonleft); + varonleft, negate); ReleaseVariableStats(vardata); - PG_RETURN_FLOAT8((float8) selec); + return selec; } /* @@ -271,20 +295,33 @@ eqsel(PG_FUNCTION_ARGS) static double var_eq_const(VariableStatData *vardata, Oid operator, Datum constval, bool constisnull, - bool varonleft) + bool varonleft, bool negate) { double selec; + double nullfrac = 0.0; bool isdefault; Oid opfuncoid; /* * If the constant is NULL, assume operator is strict and return zero, ie, - * operator will never return TRUE. + * operator will never return TRUE. (It's zero even for a negator op.) */ if (constisnull) return 0.0; /* + * Grab the nullfrac for use below. Note we allow use of nullfrac + * regardless of security check. + */ + if (HeapTupleIsValid(vardata->statsTuple)) + { + Form_pg_statistic stats; + + stats = (Form_pg_statistic) GETSTRUCT(vardata->statsTuple); + nullfrac = stats->stanullfrac; + } + + /* * If we matched the var to a unique index or DISTINCT clause, assume * there is exactly one match regardless of anything else. (This is * slightly bogus, since the index or clause's equality operator might be @@ -292,19 +329,17 @@ var_eq_const(VariableStatData *vardata, Oid operator, * ignoring the information.) */ if (vardata->isunique && vardata->rel && vardata->rel->tuples >= 1.0) - return 1.0 / vardata->rel->tuples; - - if (HeapTupleIsValid(vardata->statsTuple) && - statistic_proc_security_check(vardata, - (opfuncoid = get_opcode(operator)))) { - Form_pg_statistic stats; + selec = 1.0 / vardata->rel->tuples; + } + else if (HeapTupleIsValid(vardata->statsTuple) && + statistic_proc_security_check(vardata, + (opfuncoid = get_opcode(operator)))) + { AttStatsSlot sslot; bool match = false; int i; - stats = (Form_pg_statistic) GETSTRUCT(vardata->statsTuple); - /* * Is the constant "=" to any of the column's most common values? * (Although the given operator may not really be "=", we will assume @@ -363,7 +398,7 @@ var_eq_const(VariableStatData *vardata, Oid operator, for (i = 0; i < sslot.nnumbers; i++) sumcommon += sslot.numbers[i]; - selec = 1.0 - sumcommon - stats->stanullfrac; + selec = 1.0 - sumcommon - nullfrac; CLAMP_PROBABILITY(selec); /* @@ -396,6 +431,10 @@ var_eq_const(VariableStatData *vardata, Oid operator, selec = 1.0 / get_variable_numdistinct(vardata, &isdefault); } + /* now adjust if we wanted <> rather than = */ + if (negate) + selec = 1.0 - selec - nullfrac; + /* result should be in range, but make sure... */ CLAMP_PROBABILITY(selec); @@ -408,12 +447,24 @@ var_eq_const(VariableStatData *vardata, Oid operator, static double var_eq_non_const(VariableStatData *vardata, Oid operator, Node *other, - bool varonleft) + bool varonleft, bool negate) { double selec; + double nullfrac = 0.0; bool isdefault; /* + * Grab the nullfrac for use below. + */ + if (HeapTupleIsValid(vardata->statsTuple)) + { + Form_pg_statistic stats; + + stats = (Form_pg_statistic) GETSTRUCT(vardata->statsTuple); + nullfrac = stats->stanullfrac; + } + + /* * If we matched the var to a unique index or DISTINCT clause, assume * there is exactly one match regardless of anything else. (This is * slightly bogus, since the index or clause's equality operator might be @@ -421,16 +472,14 @@ var_eq_non_const(VariableStatData *vardata, Oid operator, * ignoring the information.) */ if (vardata->isunique && vardata->rel && vardata->rel->tuples >= 1.0) - return 1.0 / vardata->rel->tuples; - - if (HeapTupleIsValid(vardata->statsTuple)) { - Form_pg_statistic stats; + selec = 1.0 / vardata->rel->tuples; + } + else if (HeapTupleIsValid(vardata->statsTuple)) + { double ndistinct; AttStatsSlot sslot; - stats = (Form_pg_statistic) GETSTRUCT(vardata->statsTuple); - /* * Search is for a value that we do not know a priori, but we will * assume it is not NULL. Estimate the selectivity as non-null @@ -441,7 +490,7 @@ var_eq_non_const(VariableStatData *vardata, Oid operator, * values, regardless of their frequency in the table. Is that a good * idea?) */ - selec = 1.0 - stats->stanullfrac; + selec = 1.0 - nullfrac; ndistinct = get_variable_numdistinct(vardata, &isdefault); if (ndistinct > 1) selec /= ndistinct; @@ -469,6 +518,10 @@ var_eq_non_const(VariableStatData *vardata, Oid operator, selec = 1.0 / get_variable_numdistinct(vardata, &isdefault); } + /* now adjust if we wanted <> rather than = */ + if (negate) + selec = 1.0 - selec - nullfrac; + /* result should be in range, but make sure... */ CLAMP_PROBABILITY(selec); @@ -485,33 +538,7 @@ var_eq_non_const(VariableStatData *vardata, Oid operator, Datum neqsel(PG_FUNCTION_ARGS) { - PlannerInfo *root = (PlannerInfo *) PG_GETARG_POINTER(0); - Oid operator = PG_GETARG_OID(1); - List *args = (List *) PG_GETARG_POINTER(2); - int varRelid = PG_GETARG_INT32(3); - Oid eqop; - float8 result; - - /* - * We want 1 - eqsel() where the equality operator is the one associated - * with this != operator, that is, its negator. - */ - eqop = get_negator(operator); - if (eqop) - { - result = DatumGetFloat8(DirectFunctionCall4(eqsel, - PointerGetDatum(root), - ObjectIdGetDatum(eqop), - PointerGetDatum(args), - Int32GetDatum(varRelid))); - } - else - { - /* Use default selectivity (should we raise an error instead?) */ - result = DEFAULT_EQ_SEL; - } - result = 1.0 - result; - PG_RETURN_FLOAT8(result); + PG_RETURN_FLOAT8((float8) eqsel_internal(fcinfo, true)); } /* @@ -1114,6 +1141,7 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) Const *patt; Const *prefix = NULL; Selectivity rest_selec = 0; + double nullfrac = 0.0; double result; /* @@ -1203,6 +1231,17 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) } /* + * Grab the nullfrac for use below. + */ + if (HeapTupleIsValid(vardata.statsTuple)) + { + Form_pg_statistic stats; + + stats = (Form_pg_statistic) GETSTRUCT(vardata.statsTuple); + nullfrac = stats->stanullfrac; + } + + /* * Pull out any fixed prefix implied by the pattern, and estimate the * fractional selectivity of the remainder of the pattern. Unlike many of * the other functions in this file, we use the pattern operator's actual @@ -1252,7 +1291,7 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) if (eqopr == InvalidOid) elog(ERROR, "no = operator for opfamily %u", opfamily); result = var_eq_const(&vardata, eqopr, prefix->constvalue, - false, true); + false, true, false); } else { @@ -1275,8 +1314,7 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) Selectivity selec; int hist_size; FmgrInfo opproc; - double nullfrac, - mcv_selec, + double mcv_selec, sumcommon; /* Try to use the histogram entries to get selectivity */ @@ -1328,11 +1366,6 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) mcv_selec = mcv_selectivity(&vardata, &opproc, constval, true, &sumcommon); - if (HeapTupleIsValid(vardata.statsTuple)) - nullfrac = ((Form_pg_statistic) GETSTRUCT(vardata.statsTuple))->stanullfrac; - else - nullfrac = 0.0; - /* * Now merge the results from the MCV and histogram calculations, * realizing that the histogram covers only the non-null values that @@ -1340,12 +1373,16 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) */ selec *= 1.0 - nullfrac - sumcommon; selec += mcv_selec; - - /* result should be in range, but make sure... */ - CLAMP_PROBABILITY(selec); result = selec; } + /* now adjust if we wanted not-match rather than match */ + if (negate) + result = 1.0 - result - nullfrac; + + /* result should be in range, but make sure... */ + CLAMP_PROBABILITY(result); + if (prefix) { pfree(DatumGetPointer(prefix->constvalue)); @@ -1354,7 +1391,7 @@ patternsel(PG_FUNCTION_ARGS, Pattern_Type ptype, bool negate) ReleaseVariableStats(vardata); - return negate ? (1.0 - result) : result; + return result; } /* @@ -1451,7 +1488,7 @@ boolvarsel(PlannerInfo *root, Node *arg, int varRelid) * compute the selectivity as if that is what we have. */ selec = var_eq_const(&vardata, BooleanEqualOperator, - BoolGetDatum(true), false, true); + BoolGetDatum(true), false, true, false); } else if (is_funcclause(arg)) { @@ -5793,7 +5830,7 @@ prefix_selectivity(PlannerInfo *root, VariableStatData *vardata, if (cmpopr == InvalidOid) elog(ERROR, "no = operator for opfamily %u", opfamily); eq_sel = var_eq_const(vardata, cmpopr, prefixcon->constvalue, - false, true); + false, true, false); prefixsel = Max(prefixsel, eq_sel); @@ -6639,7 +6676,7 @@ add_predicate_to_quals(IndexOptInfo *index, List *indexQuals) Node *predQual = (Node *) lfirst(lc); List *oneQual = list_make1(predQual); - if (!predicate_implied_by(oneQual, indexQuals)) + if (!predicate_implied_by(oneQual, indexQuals, false)) predExtraQuals = list_concat(predExtraQuals, oneQual); } /* list_concat avoids modifying the passed-in indexQuals list */ @@ -7524,7 +7561,7 @@ gincostestimate(PlannerInfo *root, IndexPath *path, double loop_count, Node *predQual = (Node *) lfirst(l); List *oneQual = list_make1(predQual); - if (!predicate_implied_by(oneQual, indexQuals)) + if (!predicate_implied_by(oneQual, indexQuals, false)) predExtraQuals = list_concat(predExtraQuals, oneQual); } /* list_concat avoids modifying the passed-in indexQuals list */ diff --git a/src/backend/utils/cache/attoptcache.c b/src/backend/utils/cache/attoptcache.c index f7f85b53db..4b30e6bc62 100644 --- a/src/backend/utils/cache/attoptcache.c +++ b/src/backend/utils/cache/attoptcache.c @@ -71,7 +71,7 @@ InvalidateAttoptCacheCallback(Datum arg, int cacheid, uint32 hashvalue) /* * InitializeAttoptCache - * Initialize the tablespace cache. + * Initialize the attribute options cache. */ static void InitializeAttoptCache(void) diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index 54ddc55f76..6faf4ae354 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -68,7 +68,7 @@ EventCacheLookup(EventTriggerEvent event) if (EventTriggerCacheState != ETCS_VALID) BuildEventTriggerCache(); entry = hash_search(EventTriggerCache, &event, HASH_FIND, NULL); - return entry != NULL ? entry->triggerlist : NULL; + return entry != NULL ? entry->triggerlist : NIL; } /* diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index f18dbb31b0..e244faac0e 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -680,30 +680,30 @@ static const struct cachedesc cacheinfo[] = { }, 128 }, - {RangeRelationId, /* RANGETYPE */ - RangeTypidIndexId, + {PublicationRelationId, /* PUBLICATIONNAME */ + PublicationNameIndexId, 1, { - Anum_pg_range_rngtypid, + Anum_pg_publication_pubname, 0, 0, 0 }, - 4 + 8 }, - {RelationRelationId, /* RELNAMENSP */ - ClassNameNspIndexId, - 2, + {PublicationRelationId, /* PUBLICATIONOID */ + PublicationObjectIndexId, + 1, { - Anum_pg_class_relname, - Anum_pg_class_relnamespace, + ObjectIdAttributeNumber, + 0, 0, 0 }, - 128 + 8 }, - {RelationRelationId, /* RELOID */ - ClassOidIndexId, + {PublicationRelRelationId, /* PUBLICATIONREL */ + PublicationRelObjectIndexId, 1, { ObjectIdAttributeNumber, @@ -711,73 +711,73 @@ static const struct cachedesc cacheinfo[] = { 0, 0 }, - 128 + 64 }, - {ReplicationOriginRelationId, /* REPLORIGIDENT */ - ReplicationOriginIdentIndex, - 1, + {PublicationRelRelationId, /* PUBLICATIONRELMAP */ + PublicationRelPrrelidPrpubidIndexId, + 2, { - Anum_pg_replication_origin_roident, - 0, + Anum_pg_publication_rel_prrelid, + Anum_pg_publication_rel_prpubid, 0, 0 }, - 16 + 64 }, - {ReplicationOriginRelationId, /* REPLORIGNAME */ - ReplicationOriginNameIndex, + {RangeRelationId, /* RANGETYPE */ + RangeTypidIndexId, 1, { - Anum_pg_replication_origin_roname, + Anum_pg_range_rngtypid, 0, 0, 0 }, - 16 + 4 }, - {PublicationRelationId, /* PUBLICATIONOID */ - PublicationObjectIndexId, - 1, + {RelationRelationId, /* RELNAMENSP */ + ClassNameNspIndexId, + 2, { - ObjectIdAttributeNumber, - 0, + Anum_pg_class_relname, + Anum_pg_class_relnamespace, 0, 0 }, - 8 + 128 }, - {PublicationRelationId, /* PUBLICATIONNAME */ - PublicationNameIndexId, + {RelationRelationId, /* RELOID */ + ClassOidIndexId, 1, { - Anum_pg_publication_pubname, + ObjectIdAttributeNumber, 0, 0, 0 }, - 8 + 128 }, - {PublicationRelRelationId, /* PUBLICATIONREL */ - PublicationRelObjectIndexId, + {ReplicationOriginRelationId, /* REPLORIGIDENT */ + ReplicationOriginIdentIndex, 1, { - ObjectIdAttributeNumber, + Anum_pg_replication_origin_roident, 0, 0, 0 }, - 64 + 16 }, - {PublicationRelRelationId, /* PUBLICATIONRELMAP */ - PublicationRelPrrelidPrpubidIndexId, - 2, + {ReplicationOriginRelationId, /* REPLORIGNAME */ + ReplicationOriginNameIndex, + 1, { - Anum_pg_publication_rel_prrelid, - Anum_pg_publication_rel_prpubid, + Anum_pg_replication_origin_roname, + 0, 0, 0 }, - 64 + 16 }, {RewriteRelationId, /* RULERELNAME */ RewriteRelRulenameIndexId, @@ -834,23 +834,23 @@ static const struct cachedesc cacheinfo[] = { }, 128 }, - {SubscriptionRelationId, /* SUBSCRIPTIONOID */ - SubscriptionObjectIndexId, - 1, + {SubscriptionRelationId, /* SUBSCRIPTIONNAME */ + SubscriptionNameIndexId, + 2, { - ObjectIdAttributeNumber, - 0, + Anum_pg_subscription_subdbid, + Anum_pg_subscription_subname, 0, 0 }, 4 }, - {SubscriptionRelationId, /* SUBSCRIPTIONNAME */ - SubscriptionNameIndexId, - 2, + {SubscriptionRelationId, /* SUBSCRIPTIONOID */ + SubscriptionObjectIndexId, + 1, { - Anum_pg_subscription_subdbid, - Anum_pg_subscription_subname, + ObjectIdAttributeNumber, + 0, 0, 0 }, diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index b0ec4a2d27..a91faf4b71 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -32,6 +32,7 @@ volatile bool QueryCancelPending = false; volatile bool ProcDiePending = false; volatile bool ClientConnectionLost = false; volatile bool IdleInTransactionSessionTimeoutPending = false; +volatile sig_atomic_t ConfigReloadPending = false; volatile uint32 InterruptHoldoffCount = 0; volatile uint32 QueryCancelHoldoffCount = 0; volatile uint32 CritSectionCount = 0; diff --git a/src/backend/utils/misc/pg_rusage.c b/src/backend/utils/misc/pg_rusage.c index e4dccc383a..98fa7ea9a8 100644 --- a/src/backend/utils/misc/pg_rusage.c +++ b/src/backend/utils/misc/pg_rusage.c @@ -61,7 +61,7 @@ pg_rusage_show(const PGRUsage *ru0) } snprintf(result, sizeof(result), - "CPU: user: %d.%02d s, system: %d.%02d s, elapsed: %d.%02d s", + _("CPU: user: %d.%02d s, system: %d.%02d s, elapsed: %d.%02d s"), (int) (ru1.ru.ru_utime.tv_sec - ru0->ru.ru_utime.tv_sec), (int) (ru1.ru.ru_utime.tv_usec - ru0->ru.ru_utime.tv_usec) / 10000, (int) (ru1.ru.ru_stime.tv_sec - ru0->ru.ru_stime.tv_sec), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index cd07343fd4..ee88e94a93 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -84,7 +84,7 @@ #ssl_key_file = 'server.key' #ssl_ca_file = '' #ssl_crl_file = '' -#password_encryption = md5 # md5, scram-sha-256 or plain +#password_encryption = md5 # md5, scram-sha-256, or plain #db_user_namespace = off #row_security = on @@ -162,7 +162,7 @@ #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #max_worker_processes = 8 # (change requires restart) #max_parallel_workers_per_gather = 2 # taken from max_parallel_workers -#max_parallel_workers = 8 # maximum number of max_worker_processes that +#max_parallel_workers = 8 # maximum number of max_worker_processes that # can be used in parallel queries #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate # (change requires restart) @@ -249,7 +249,7 @@ # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep - # method to choose sync standbys, number of sync standbys + # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index f89d635162..687222fc54 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -59,6 +59,7 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "storage/sinval.h" +#include "storage/sinvaladt.h" #include "storage/spin.h" #include "utils/builtins.h" #include "utils/memutils.h" @@ -214,11 +215,15 @@ static Snapshot FirstXactSnapshot = NULL; /* Define pathname of exported-snapshot files */ #define SNAPSHOT_EXPORT_DIR "pg_snapshots" -#define XactExportFilePath(path, xid, num, suffix) \ - snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%d%s", \ - xid, num, suffix) -/* Current xact's exported snapshots (a list of Snapshot structs) */ +/* Structure holding info about exported snapshot. */ +typedef struct ExportedSnapshot +{ + char *snapfile; + Snapshot snapshot; +} ExportedSnapshot; + +/* Current xact's exported snapshots (a list of ExportedSnapshot structs) */ static List *exportedSnapshots = NIL; /* Prototypes for local functions */ @@ -587,8 +592,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, - PGPROC *sourceproc) +SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, + int sourcepid, PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -646,12 +651,12 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, errmsg("could not import the requested snapshot"), errdetail("The source transaction is not running anymore."))); } - else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcevxid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), - errdetail("The source transaction %u is not running anymore.", - sourcexid))); + errdetail("The source process with pid %d is not running anymore.", + sourcepid))); /* * In transaction-snapshot mode, the first snapshot must live until end of @@ -661,7 +666,8 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, if (IsolationUsesXactSnapshot()) { if (IsolationIsSerializable()) - SetSerializableTransactionSnapshot(CurrentSnapshot, sourcexid); + SetSerializableTransactionSnapshot(CurrentSnapshot, sourcevxid, + sourcepid); /* Make a saved copy */ CurrentSnapshot = CopySnapshot(CurrentSnapshot); FirstXactSnapshot = CurrentSnapshot; @@ -1121,33 +1127,29 @@ AtEOXact_Snapshot(bool isCommit, bool resetXmin) */ if (exportedSnapshots != NIL) { - TransactionId myxid = GetTopTransactionId(); - int i; - char buf[MAXPGPATH]; ListCell *lc; /* * Get rid of the files. Unlink failure is only a WARNING because (1) * it's too late to abort the transaction, and (2) leaving a leaked * file around has little real consequence anyway. - */ - for (i = 1; i <= list_length(exportedSnapshots); i++) - { - XactExportFilePath(buf, myxid, i, ""); - if (unlink(buf)) - elog(WARNING, "could not unlink file \"%s\": %m", buf); - } - - /* - * As with the FirstXactSnapshot, we needn't spend any effort on - * cleaning up the per-snapshot data structures, but we do need to - * remove them from RegisteredSnapshots to prevent a warning below. + * + * We also also need to remove the snapshots from RegisteredSnapshots + * to prevent a warning below. + * + * As with the FirstXactSnapshot, we don't need to free resources of + * the snapshot iself as it will go away with the memory context. */ foreach(lc, exportedSnapshots) { - Snapshot snap = (Snapshot) lfirst(lc); + ExportedSnapshot *esnap = (ExportedSnapshot *) lfirst(lc); - pairingheap_remove(&RegisteredSnapshots, &snap->ph_node); + if (unlink(esnap->snapfile)) + elog(WARNING, "could not unlink file \"%s\": %m", + esnap->snapfile); + + pairingheap_remove(&RegisteredSnapshots, + &esnap->snapshot->ph_node); } exportedSnapshots = NIL; @@ -1205,6 +1207,7 @@ ExportSnapshot(Snapshot snapshot) { TransactionId topXid; TransactionId *children; + ExportedSnapshot *esnap; int nchildren; int addTopXid; StringInfoData buf; @@ -1229,9 +1232,9 @@ ExportSnapshot(Snapshot snapshot) */ /* - * This will assign a transaction ID if we do not yet have one. + * Get our transaction ID if there is one, to include in the snapshot. */ - topXid = GetTopTransactionId(); + topXid = GetTopTransactionIdIfAny(); /* * We cannot export a snapshot from a subtransaction because there's no @@ -1251,6 +1254,13 @@ ExportSnapshot(Snapshot snapshot) nchildren = xactGetCommittedChildren(&children); /* + * Generate file path for the snapshot. We start numbering of snapshots + * inside the transaction from 1. + */ + snprintf(path, sizeof(path), SNAPSHOT_EXPORT_DIR "/%08X-%08X-%d", + MyProc->backendId, MyProc->lxid, list_length(exportedSnapshots) + 1); + + /* * Copy the snapshot into TopTransactionContext, add it to the * exportedSnapshots list, and mark it pseudo-registered. We do this to * ensure that the snapshot's xmin is honored for the rest of the @@ -1259,7 +1269,10 @@ ExportSnapshot(Snapshot snapshot) snapshot = CopySnapshot(snapshot); oldcxt = MemoryContextSwitchTo(TopTransactionContext); - exportedSnapshots = lappend(exportedSnapshots, snapshot); + esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot)); + esnap->snapfile = pstrdup(path); + esnap->snapshot = snapshot; + exportedSnapshots = lappend(exportedSnapshots, esnap); MemoryContextSwitchTo(oldcxt); snapshot->regd_count++; @@ -1272,7 +1285,8 @@ ExportSnapshot(Snapshot snapshot) */ initStringInfo(&buf); - appendStringInfo(&buf, "xid:%u\n", topXid); + appendStringInfo(&buf, "vxid:%d/%u\n", MyProc->backendId, MyProc->lxid); + appendStringInfo(&buf, "pid:%d\n", MyProcPid); appendStringInfo(&buf, "dbid:%u\n", MyDatabaseId); appendStringInfo(&buf, "iso:%d\n", XactIsoLevel); appendStringInfo(&buf, "ro:%d\n", XactReadOnly); @@ -1291,7 +1305,8 @@ ExportSnapshot(Snapshot snapshot) * xmax. (We need not make the same check for subxip[] members, see * snapshot.h.) */ - addTopXid = TransactionIdPrecedes(topXid, snapshot->xmax) ? 1 : 0; + addTopXid = (TransactionIdIsValid(topXid) && + TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0; appendStringInfo(&buf, "xcnt:%d\n", snapshot->xcnt + addTopXid); for (i = 0; i < snapshot->xcnt; i++) appendStringInfo(&buf, "xip:%u\n", snapshot->xip[i]); @@ -1322,7 +1337,7 @@ ExportSnapshot(Snapshot snapshot) * ensures that no other backend can read an incomplete file * (ImportSnapshot won't allow it because of its valid-characters check). */ - XactExportFilePath(pathtmp, topXid, list_length(exportedSnapshots), ".tmp"); + snprintf(pathtmp, sizeof(pathtmp), "%s.tmp", path); if (!(f = AllocateFile(pathtmp, PG_BINARY_W))) ereport(ERROR, (errcode_for_file_access(), @@ -1344,8 +1359,6 @@ ExportSnapshot(Snapshot snapshot) * Now that we have written everything into a .tmp file, rename the file * to remove the .tmp suffix. */ - XactExportFilePath(path, topXid, list_length(exportedSnapshots), ""); - if (rename(pathtmp, path) < 0) ereport(ERROR, (errcode_for_file_access(), @@ -1430,6 +1443,30 @@ parseXidFromText(const char *prefix, char **s, const char *filename) return val; } +static void +parseVxidFromText(const char *prefix, char **s, const char *filename, + VirtualTransactionId *vxid) +{ + char *ptr = *s; + int prefixlen = strlen(prefix); + + if (strncmp(ptr, prefix, prefixlen) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr += prefixlen; + if (sscanf(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + ptr = strchr(ptr, '\n'); + if (!ptr) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), + errmsg("invalid snapshot data in file \"%s\"", filename))); + *s = ptr + 1; +} + /* * ImportSnapshot * Import a previously exported snapshot. The argument should be a @@ -1445,7 +1482,8 @@ ImportSnapshot(const char *idstr) char *filebuf; int xcnt; int i; - TransactionId src_xid; + VirtualTransactionId src_vxid; + int src_pid; Oid src_dbid; int src_isolevel; bool src_readonly; @@ -1509,7 +1547,8 @@ ImportSnapshot(const char *idstr) */ memset(&snapshot, 0, sizeof(snapshot)); - src_xid = parseXidFromText("xid:", &filebuf, path); + parseVxidFromText("vxid:", &filebuf, path, &src_vxid); + src_pid = parseIntFromText("pid:", &filebuf, path); /* we abuse parseXidFromText a bit here ... */ src_dbid = parseXidFromText("dbid:", &filebuf, path); src_isolevel = parseIntFromText("iso:", &filebuf, path); @@ -1559,7 +1598,7 @@ ImportSnapshot(const char *idstr) * don't trouble to check the array elements, just the most critical * fields. */ - if (!TransactionIdIsNormal(src_xid) || + if (!VirtualTransactionIdIsValid(src_vxid) || !OidIsValid(src_dbid) || !TransactionIdIsNormal(snapshot.xmin) || !TransactionIdIsNormal(snapshot.xmax)) @@ -1600,7 +1639,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid, NULL); + SetTransactionSnapshot(&snapshot, &src_vxid, src_pid, NULL); } /* @@ -2187,5 +2226,5 @@ RestoreSnapshot(char *start_address) void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) { - SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); + SetTransactionSnapshot(snapshot, NULL, InvalidPid, master_pgproc); } |