Database-level collation version tracking
authorPeter Eisentraut <peter@eisentraut.org>
Mon, 14 Feb 2022 07:09:04 +0000 (08:09 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Mon, 14 Feb 2022 07:27:26 +0000 (08:27 +0100)
This adds to database objects the same version tracking that collation
objects have.  There is a new pg_database column datcollversion that
stores the version, a new function
pg_database_collation_actual_version() to get the version from the
operating system, and a new subcommand ALTER DATABASE ... REFRESH
COLLATION VERSION.

This was not originally added together with pg_collation.collversion,
since originally version tracking was only supported for ICU, and ICU
on a database-level is not currently supported.  But we now have
version tracking for glibc (since PG13), FreeBSD (since PG14), and
Windows (since PG13), so this is useful to have now.

Reviewed-by: Julien Rouhaud <rjuju123@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/f0ff3190-29a3-5b39-a179-fa32eee57db6%40enterprisedb.com

22 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/func.sgml
doc/src/sgml/ref/alter_collation.sgml
doc/src/sgml/ref/alter_database.sgml
doc/src/sgml/ref/create_database.sgml
src/backend/commands/dbcommands.c
src/backend/parser/gram.y
src/backend/tcop/utility.c
src/backend/utils/init/postinit.c
src/bin/initdb/initdb.c
src/bin/pg_dump/pg_dump.c
src/bin/psql/tab-complete.c
src/include/catalog/catversion.h
src/include/catalog/pg_database.h
src/include/catalog/pg_proc.dat
src/include/commands/dbcommands.h
src/include/nodes/nodes.h
src/include/nodes/parsenodes.h
src/test/regress/expected/collate.icu.utf8.out
src/test/regress/expected/collate.linux.utf8.out
src/test/regress/sql/collate.icu.utf8.sql
src/test/regress/sql/collate.linux.utf8.sql

index 879d2dbce032433759410ba7cad27bbb58b0974a..5a1627a3941d5ef85ae35dad64f329ede8fb7b1f 100644 (file)
@@ -3043,6 +3043,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datcollversion</structfield> <type>text</type>
+      </para>
+      <para>
+       Provider-specific version of the collation.  This is recorded when the
+       database is created and then checked when it is used, to detect
+       changes in the collation definition that could lead to data corruption.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>datacl</structfield> <type>aclitem[]</type>
index 1b064b4febf131113fbf60ae5b9e52d9f03e6fa5..df3cd5987b7fbadda97c9493598e07adc66c3a75 100644 (file)
@@ -27061,6 +27061,24 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_database_collation_actual_version</primary>
+        </indexterm>
+        <function>pg_database_collation_actual_version</function> ( <type>oid</type> )
+        <returnvalue>text</returnvalue>
+       </para>
+       <para>
+        Returns the actual version of the database's collation as it is currently
+        installed in the operating system.  If this is different from the
+        value in
+        <structname>pg_database</structname>.<structfield>datcollversion</structfield>,
+        then objects depending on the collation might need to be rebuilt.  See
+        also <xref linkend="sql-alterdatabase"/>.
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
index 892c46656552f485df7e13bf3c70d57d1ba83bd1..a8c831d7286614a434aa9248f3244395afa1400f 100644 (file)
@@ -151,7 +151,8 @@ HINT:  Rebuild all objects affected by this collation and run ALTER COLLATION pg
    </para>
   </note>
   <para>
-   Currently, there is no version tracking for the database default collation.
+   For the database default collation, there is an analogous command
+   <literal>ALTER DATABASE ... REFRESH COLLATION VERSION</literal>.
   </para>
 
   <para>
index 81e37536a3f6695508f096ec604928460e3bc781..89ed261b4c2d5a8b1c0f9c2e47a8ce06c14bced7 100644 (file)
@@ -35,6 +35,8 @@ ALTER DATABASE <replaceable class="parameter">name</replaceable> OWNER TO { <rep
 
 ALTER DATABASE <replaceable class="parameter">name</replaceable> SET TABLESPACE <replaceable class="parameter">new_tablespace</replaceable>
 
+ALTER DATABASE <replaceable class="parameter">name</replaceable> REFRESH COLLATION VERSION
+
 ALTER DATABASE <replaceable class="parameter">name</replaceable> SET <replaceable>configuration_parameter</replaceable> { TO | = } { <replaceable>value</replaceable> | DEFAULT }
 ALTER DATABASE <replaceable class="parameter">name</replaceable> SET <replaceable>configuration_parameter</replaceable> FROM CURRENT
 ALTER DATABASE <replaceable class="parameter">name</replaceable> RESET <replaceable>configuration_parameter</replaceable>
@@ -171,6 +173,16 @@ ALTER DATABASE <replaceable class="parameter">name</replaceable> RESET ALL
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>REFRESH COLLATION VERSION</literal></term>
+    <listitem>
+     <para>
+      Update the database collation version.  See <xref
+      linkend="sql-altercollation-notes"/> for background.
+     </para>
+    </listitem>
+   </varlistentry>
+
      <varlistentry>
       <term><replaceable>configuration_parameter</replaceable></term>
       <term><replaceable>value</replaceable></term>
index f22e28dc81b8315867fe573c1ffdddaab56622e1..f70d0c75b4dcf93bfeb08b754d0c7088a2ff92d9 100644 (file)
@@ -28,6 +28,7 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
            [ LOCALE [=] <replaceable class="parameter">locale</replaceable> ]
            [ LC_COLLATE [=] <replaceable class="parameter">lc_collate</replaceable> ]
            [ LC_CTYPE [=] <replaceable class="parameter">lc_ctype</replaceable> ]
+           [ COLLATION_VERSION = <replaceable>collation_version</replaceable> ]
            [ TABLESPACE [=] <replaceable class="parameter">tablespace_name</replaceable> ]
            [ ALLOW_CONNECTIONS [=] <replaceable class="parameter">allowconn</replaceable> ]
            [ CONNECTION LIMIT [=] <replaceable class="parameter">connlimit</replaceable> ]
@@ -158,6 +159,26 @@ CREATE DATABASE <replaceable class="parameter">name</replaceable>
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><replaceable>collation_version</replaceable></term>
+
+      <listitem>
+       <para>
+        Specifies the collation version string to store with the database.
+        Normally, this should be omitted, which will cause the version to be
+        computed from the actual version of the database collation as provided
+        by the operating system.  This option is intended to be used by
+        <command>pg_upgrade</command> for copying the version from an existing
+        installation.
+       </para>
+
+       <para>
+        See also <xref linkend="sql-alterdatabase"/> for how to handle
+        database collation version mismatches.
+       </para>
+     </listitem>
+    </varlistentry>
      <varlistentry>
       <term><replaceable class="parameter">tablespace_name</replaceable></term>
       <listitem>
index 700b1209652859c68a38dc42dac9a6c60bb4a4e0..c37e3c9a9a4f04e4db00c8baaa08e10fc4015efc 100644 (file)
@@ -36,6 +36,7 @@
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_db_role_setting.h"
 #include "catalog/pg_subscription.h"
@@ -85,7 +86,8 @@ static bool get_db_info(const char *name, LOCKMODE lockmode,
                                                Oid *dbIdP, Oid *ownerIdP,
                                                int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
                                                TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
-                                               Oid *dbTablespace, char **dbCollate, char **dbCtype);
+                                               Oid *dbTablespace, char **dbCollate, char **dbCtype,
+                                               char **dbCollversion);
 static bool have_createdb_privilege(void);
 static void remove_dbtablespaces(Oid db_id);
 static bool check_db_file_conflict(Oid db_id);
@@ -105,6 +107,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        int                     src_encoding = -1;
        char       *src_collate = NULL;
        char       *src_ctype = NULL;
+       char       *src_collversion = NULL;
        bool            src_istemplate;
        bool            src_allowconn;
        TransactionId src_frozenxid = InvalidTransactionId;
@@ -128,6 +131,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        DefElem    *distemplate = NULL;
        DefElem    *dallowconnections = NULL;
        DefElem    *dconnlimit = NULL;
+       DefElem    *dcollversion = NULL;
        char       *dbname = stmt->dbname;
        char       *dbowner = NULL;
        const char *dbtemplate = NULL;
@@ -138,6 +142,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        bool            dbistemplate = false;
        bool            dballowconnections = true;
        int                     dbconnlimit = -1;
+       char       *dbcollversion = NULL;
        int                     notherbackends;
        int                     npreparedxacts;
        createdb_failure_params fparms;
@@ -207,6 +212,12 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                                errorConflictingDefElem(defel, pstate);
                        dconnlimit = defel;
                }
+               else if (strcmp(defel->defname, "collation_version") == 0)
+               {
+                       if (dcollversion)
+                               errorConflictingDefElem(defel, pstate);
+                       dcollversion = defel;
+               }
                else if (strcmp(defel->defname, "location") == 0)
                {
                        ereport(WARNING,
@@ -305,6 +316,8 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("invalid connection limit: %d", dbconnlimit)));
        }
+       if (dcollversion)
+               dbcollversion = defGetString(dcollversion);
 
        /* obtain OID of proposed owner */
        if (dbowner)
@@ -342,7 +355,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                                         &src_dboid, &src_owner, &src_encoding,
                                         &src_istemplate, &src_allowconn,
                                         &src_frozenxid, &src_minmxid, &src_deftablespace,
-                                        &src_collate, &src_ctype))
+                                        &src_collate, &src_ctype, &src_collversion))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("template database \"%s\" does not exist",
@@ -424,6 +437,52 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                                         errhint("Use the same LC_CTYPE as in the template database, or use template0 as template.")));
        }
 
+       /*
+        * If we got a collation version for the template database, check that it
+        * matches the actual OS collation version.  Otherwise error; the user
+        * needs to fix the template database first.  Don't complain if a
+        * collation version was specified explicitly as a statement option; that
+        * is used by pg_upgrade to reproduce the old state exactly.
+        *
+        * (If the template database has no collation version, then either the
+        * platform/provider does not support collation versioning, or it's
+        * template0, for which we stipulate that it does not contain
+        * collation-using objects.)
+        */
+       if (src_collversion && !dcollversion)
+       {
+               char       *actual_versionstr;
+
+               actual_versionstr = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
+               if (!actual_versionstr)
+                       ereport(ERROR,
+                                       (errmsg("template database \"%s\" has a collation version, but no actual collation version could be determined",
+                                                       dbtemplate)));
+
+               if (strcmp(actual_versionstr, src_collversion) != 0)
+                       ereport(ERROR,
+                                       (errmsg("template database \"%s\" has a collation version mismatch",
+                                                       dbtemplate),
+                                        errdetail("The template database was created using collation version %s, "
+                                                          "but the operating system provides version %s.",
+                                                          src_collversion, actual_versionstr),
+                                        errhint("Rebuild all objects in the template database that use the default collation and run "
+                                                        "ALTER DATABASE %s REFRESH COLLATION VERSION, "
+                                                        "or build PostgreSQL with the right library version.",
+                                                        quote_identifier(dbtemplate))));
+       }
+
+       if (dbcollversion == NULL)
+               dbcollversion = src_collversion;
+
+       /*
+        * Normally, we copy the collation version from the template database.
+        * This last resort only applies if the template database does not have a
+        * collation version, which is normally only the case for template0.
+        */
+       if (dbcollversion == NULL)
+               dbcollversion = get_collation_actual_version(COLLPROVIDER_LIBC, dbcollate);
+
        /* Resolve default tablespace for new database */
        if (dtablespacename && dtablespacename->arg)
        {
@@ -578,6 +637,10 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace);
        new_record[Anum_pg_database_datcollate - 1] = CStringGetTextDatum(dbcollate);
        new_record[Anum_pg_database_datctype - 1] = CStringGetTextDatum(dbctype);
+       if (dbcollversion)
+               new_record[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(dbcollversion);
+       else
+               new_record_nulls[Anum_pg_database_datcollversion - 1] = true;
 
        /*
         * We deliberately set datacl to default (NULL), rather than copying it
@@ -844,7 +907,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
        pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
 
        if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
-                                        &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL))
+                                        &db_istemplate, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
        {
                if (!missing_ok)
                {
@@ -1046,7 +1109,7 @@ RenameDatabase(const char *oldname, const char *newname)
        rel = table_open(DatabaseRelationId, RowExclusiveLock);
 
        if (!get_db_info(oldname, AccessExclusiveLock, &db_id, NULL, NULL,
-                                        NULL, NULL, NULL, NULL, NULL, NULL, NULL))
+                                        NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", oldname)));
@@ -1159,7 +1222,7 @@ movedb(const char *dbname, const char *tblspcname)
        pgdbrel = table_open(DatabaseRelationId, RowExclusiveLock);
 
        if (!get_db_info(dbname, AccessExclusiveLock, &db_id, NULL, NULL,
-                                        NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL))
+                                        NULL, NULL, NULL, NULL, &src_tblspcoid, NULL, NULL, NULL))
                ereport(ERROR,
                                (errcode(ERRCODE_UNDEFINED_DATABASE),
                                 errmsg("database \"%s\" does not exist", dbname)));
@@ -1651,6 +1714,88 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
 }
 
 
+/*
+ * ALTER DATABASE name REFRESH COLLATION VERSION
+ */
+ObjectAddress
+AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt)
+{
+       Relation        rel;
+       ScanKeyData scankey;
+       SysScanDesc scan;
+       Oid                     db_id;
+       HeapTuple       tuple;
+       Form_pg_database datForm;
+       ObjectAddress address;
+       Datum           datum;
+       bool            isnull;
+       char       *oldversion;
+       char       *newversion;
+
+       rel = table_open(DatabaseRelationId, RowExclusiveLock);
+       ScanKeyInit(&scankey,
+                               Anum_pg_database_datname,
+                               BTEqualStrategyNumber, F_NAMEEQ,
+                               CStringGetDatum(stmt->dbname));
+       scan = systable_beginscan(rel, DatabaseNameIndexId, true,
+                                                         NULL, 1, &scankey);
+       tuple = systable_getnext(scan);
+       if (!HeapTupleIsValid(tuple))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_DATABASE),
+                                errmsg("database \"%s\" does not exist", stmt->dbname)));
+
+       datForm = (Form_pg_database) GETSTRUCT(tuple);
+       db_id = datForm->oid;
+
+       if (!pg_database_ownercheck(db_id, GetUserId()))
+               aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
+                                          stmt->dbname);
+
+       datum = heap_getattr(tuple, Anum_pg_database_datcollversion, RelationGetDescr(rel), &isnull);
+       oldversion = isnull ? NULL : TextDatumGetCString(datum);
+
+       datum = heap_getattr(tuple, Anum_pg_database_datcollate, RelationGetDescr(rel), &isnull);
+       Assert(!isnull);
+       newversion = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
+
+       /* cannot change from NULL to non-NULL or vice versa */
+       if ((!oldversion && newversion) || (oldversion && !newversion))
+               elog(ERROR, "invalid collation version change");
+       else if (oldversion && newversion && strcmp(newversion, oldversion) != 0)
+       {
+               bool            nulls[Natts_pg_database] = {0};
+               bool            replaces[Natts_pg_database] = {0};
+               Datum           values[Natts_pg_database] = {0};
+
+               ereport(NOTICE,
+                               (errmsg("changing version from %s to %s",
+                                               oldversion, newversion)));
+
+               values[Anum_pg_database_datcollversion - 1] = CStringGetTextDatum(newversion);
+               replaces[Anum_pg_database_datcollversion - 1] = true;
+
+               tuple = heap_modify_tuple(tuple, RelationGetDescr(rel),
+                                                                 values, nulls, replaces);
+               CatalogTupleUpdate(rel, &tuple->t_self, tuple);
+               heap_freetuple(tuple);
+       }
+       else
+               ereport(NOTICE,
+                               (errmsg("version has not changed")));
+
+       InvokeObjectPostAlterHook(DatabaseRelationId, db_id, 0);
+
+       ObjectAddressSet(address, DatabaseRelationId, db_id);
+
+       systable_endscan(scan);
+
+       table_close(rel, NoLock);
+
+       return address;
+}
+
+
 /*
  * ALTER DATABASE name SET ...
  */
@@ -1793,6 +1938,34 @@ AlterDatabaseOwner(const char *dbname, Oid newOwnerId)
 }
 
 
+Datum
+pg_database_collation_actual_version(PG_FUNCTION_ARGS)
+{
+       Oid                     dbid = PG_GETARG_OID(0);
+       HeapTuple       tp;
+       Datum           datum;
+       bool            isnull;
+       char       *version;
+
+       tp = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
+       if (!HeapTupleIsValid(tp))
+               ereport(ERROR,
+                               (errcode(ERRCODE_UNDEFINED_OBJECT),
+                                errmsg("database with OID %u does not exist", dbid)));
+
+       datum = SysCacheGetAttr(DATABASEOID, tp, Anum_pg_database_datcollate, &isnull);
+       Assert(!isnull);
+       version = get_collation_actual_version(COLLPROVIDER_LIBC, TextDatumGetCString(datum));
+
+       ReleaseSysCache(tp);
+
+       if (version)
+               PG_RETURN_TEXT_P(cstring_to_text(version));
+       else
+               PG_RETURN_NULL();
+}
+
+
 /*
  * Helper functions
  */
@@ -1808,7 +1981,8 @@ get_db_info(const char *name, LOCKMODE lockmode,
                        Oid *dbIdP, Oid *ownerIdP,
                        int *encodingP, bool *dbIsTemplateP, bool *dbAllowConnP,
                        TransactionId *dbFrozenXidP, MultiXactId *dbMinMultiP,
-                       Oid *dbTablespace, char **dbCollate, char **dbCtype)
+                       Oid *dbTablespace, char **dbCollate, char **dbCtype,
+                       char **dbCollversion)
 {
        bool            result = false;
        Relation        relation;
@@ -1913,6 +2087,14 @@ get_db_info(const char *name, LOCKMODE lockmode,
                                        Assert(!isnull);
                                        *dbCtype = TextDatumGetCString(datum);
                                }
+                               if (dbCollversion)
+                               {
+                                       datum = SysCacheGetAttr(DATABASEOID, tuple, Anum_pg_database_datcollversion, &isnull);
+                                       if (isnull)
+                                               *dbCollversion = NULL;
+                                       else
+                                               *dbCollversion = TextDatumGetCString(datum);
+                               }
                                ReleaseSysCache(tuple);
                                result = true;
                                break;
index c4f32425060a6501907de51e03a6ed9b8be8a2e8..92f93cfc72d900701d58b3dfb0d1efbfe9317c98 100644 (file)
@@ -10465,6 +10465,12 @@ AlterDatabaseStmt:
                                                                                                                (Node *)makeString($6), @6));
                                        $$ = (Node *)n;
                                 }
+                       | ALTER DATABASE name REFRESH COLLATION VERSION_P
+                                {
+                                       AlterDatabaseRefreshCollStmt *n = makeNode(AlterDatabaseRefreshCollStmt);
+                                       n->dbname = $3;
+                                       $$ = (Node *)n;
+                                }
                ;
 
 AlterDatabaseSetStmt:
index 83e4e37c78a2642dcfa47568b26fe0fe751f390b..3780c6e812ea87bd424d806640357841f424c37a 100644 (file)
@@ -136,6 +136,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
        switch (nodeTag(parsetree))
        {
                case T_AlterCollationStmt:
+               case T_AlterDatabaseRefreshCollStmt:
                case T_AlterDatabaseSetStmt:
                case T_AlterDatabaseStmt:
                case T_AlterDefaultPrivilegesStmt:
@@ -779,6 +780,11 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                        AlterDatabase(pstate, (AlterDatabaseStmt *) parsetree, isTopLevel);
                        break;
 
+               case T_AlterDatabaseRefreshCollStmt:
+                       /* no event triggers for global objects */
+                       AlterDatabaseRefreshColl((AlterDatabaseRefreshCollStmt *) parsetree);
+                       break;
+
                case T_AlterDatabaseSetStmt:
                        /* no event triggers for global objects */
                        AlterDatabaseSet((AlterDatabaseSetStmt *) parsetree);
@@ -2801,9 +2807,7 @@ CreateCommandTag(Node *parsetree)
                        break;
 
                case T_AlterDatabaseStmt:
-                       tag = CMDTAG_ALTER_DATABASE;
-                       break;
-
+               case T_AlterDatabaseRefreshCollStmt:
                case T_AlterDatabaseSetStmt:
                        tag = CMDTAG_ALTER_DATABASE;
                        break;
@@ -3444,9 +3448,7 @@ GetCommandLogLevel(Node *parsetree)
                        break;
 
                case T_AlterDatabaseStmt:
-                       lev = LOGSTMT_DDL;
-                       break;
-
+               case T_AlterDatabaseRefreshCollStmt:
                case T_AlterDatabaseSetStmt:
                        lev = LOGSTMT_DDL;
                        break;
index ca53912f151d79e712a43409cd1d1c0332e77684..02207a5b6f8997b71514fba0d5c8c45766cbc5a4 100644 (file)
@@ -32,6 +32,7 @@
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_collation.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_db_role_setting.h"
 #include "catalog/pg_tablespace.h"
@@ -418,6 +419,39 @@ CheckMyDatabase(const char *name, bool am_superuser, bool override_allow_connect
                                                   " which is not recognized by setlocale().", ctype),
                                 errhint("Recreate the database with another locale or install the missing locale.")));
 
+       /*
+        * Check collation version.  See similar code in
+        * pg_newlocale_from_collation().  Note that here we warn instead of error
+        * in any case, so that we don't prevent connecting.
+        */
+       datum = SysCacheGetAttr(DATABASEOID, tup, Anum_pg_database_datcollversion,
+                                                       &isnull);
+       if (!isnull)
+       {
+               char       *actual_versionstr;
+               char       *collversionstr;
+
+               collversionstr = TextDatumGetCString(datum);
+
+               actual_versionstr = get_collation_actual_version(COLLPROVIDER_LIBC, collate);
+               if (!actual_versionstr)
+                       ereport(WARNING,
+                                       (errmsg("database \"%s\" has no actual collation version, but a version was recorded",
+                                                       name)));
+
+               if (strcmp(actual_versionstr, collversionstr) != 0)
+                       ereport(WARNING,
+                                       (errmsg("database \"%s\" has a collation version mismatch",
+                                                       name),
+                                        errdetail("The database was created using collation version %s, "
+                                                          "but the operating system provides version %s.",
+                                                          collversionstr, actual_versionstr),
+                                        errhint("Rebuild all objects in this database that use the default collation and run "
+                                                        "ALTER DATABASE %s REFRESH COLLATION VERSION, "
+                                                        "or build PostgreSQL with the right library version.",
+                                                        quote_identifier(name))));
+       }
+
        /* Make the locale settings visible as GUC variables, too */
        SetConfigOption("lc_collate", collate, PGC_INTERNAL, PGC_S_OVERRIDE);
        SetConfigOption("lc_ctype", ctype, PGC_INTERNAL, PGC_S_OVERRIDE);
index d78e8e67b8d2f95f661db8a2009d66598c252af6..97f15971e2bef6ff3ebf69ee00840bcdd9ea0a80 100644 (file)
@@ -1857,6 +1857,18 @@ make_template0(FILE *cmdfd)
                "CREATE DATABASE template0 IS_TEMPLATE = true ALLOW_CONNECTIONS = false OID = "
                CppAsString2(Template0ObjectId) ";\n\n",
 
+               /*
+                * template0 shouldn't have any collation-dependent objects, so unset
+                * the collation version.  This disables collation version checks when
+                * making a new database from it.
+                */
+               "UPDATE pg_database SET datcollversion = NULL WHERE datname = 'template0';\n\n",
+
+               /*
+                * While we are here, do set the collation version on template1.
+                */
+               "UPDATE pg_database SET datcollversion = pg_database_collation_actual_version(oid) WHERE datname = 'template1';\n\n",
+
                /*
                 * Explicitly revoke public create-schema and create-temp-table
                 * privileges in template1 and template0; else the latter would be on
index 3b4b63d89713224398011d17b1a1c5b35221a820..4485ea83b1e1179f706cdfc81522671874df1be8 100644 (file)
@@ -2761,6 +2761,7 @@ dumpDatabase(Archive *fout)
                                i_acldefault,
                                i_datistemplate,
                                i_datconnlimit,
+                               i_datcollversion,
                                i_tablespace;
        CatalogId       dbCatId;
        DumpId          dbDumpId;
@@ -2792,6 +2793,10 @@ dumpDatabase(Archive *fout)
                appendPQExpBuffer(dbQry, "datminmxid, ");
        else
                appendPQExpBuffer(dbQry, "0 AS datminmxid, ");
+       if (fout->remoteVersion >= 150000)
+               appendPQExpBuffer(dbQry, "datcollversion, ");
+       else
+               appendPQExpBuffer(dbQry, "NULL AS datcollversion, ");
        appendPQExpBuffer(dbQry,
                                          "(SELECT spcname FROM pg_tablespace t WHERE t.oid = dattablespace) AS tablespace, "
                                          "shobj_description(oid, 'pg_database') AS description "
@@ -2813,6 +2818,7 @@ dumpDatabase(Archive *fout)
        i_acldefault = PQfnumber(res, "acldefault");
        i_datistemplate = PQfnumber(res, "datistemplate");
        i_datconnlimit = PQfnumber(res, "datconnlimit");
+       i_datcollversion = PQfnumber(res, "datcollversion");
        i_tablespace = PQfnumber(res, "tablespace");
 
        dbCatId.tableoid = atooid(PQgetvalue(res, 0, i_tableoid));
@@ -2872,6 +2878,21 @@ dumpDatabase(Archive *fout)
                }
        }
 
+       /*
+        * For binary upgrade, carry over the collation version.  For normal
+        * dump/restore, omit the version, so that it is computed upon restore.
+        */
+       if (dopt->binary_upgrade)
+       {
+               if (!PQgetisnull(res, 0, i_datcollversion))
+               {
+                       appendPQExpBufferStr(creaQry, " COLLATION_VERSION = ");
+                       appendStringLiteralAH(creaQry,
+                                                                 PQgetvalue(res, 0, i_datcollversion),
+                                                                 fout);
+               }
+       }
+
        /*
         * Note: looking at dopt->outputNoTablespaces here is completely the wrong
         * thing; the decision whether to specify a tablespace should be left till
index 988822721303c4e0e7390039787ab260b7a023e8..010edb685fae51157de3d22282b4784bf82b8ee8 100644 (file)
@@ -1849,7 +1849,7 @@ psql_completion(const char *text, int start, int end)
 
        /* ALTER DATABASE <name> */
        else if (Matches("ALTER", "DATABASE", MatchAny))
-               COMPLETE_WITH("RESET", "SET", "OWNER TO", "RENAME TO",
+               COMPLETE_WITH("RESET", "SET", "OWNER TO", "REFRESH COLLATION VERSION", "RENAME TO",
                                          "IS_TEMPLATE", "ALLOW_CONNECTIONS",
                                          "CONNECTION LIMIT");
 
index 08f9b984d7f2e7bf57ecf261b23f7759279fd248..b940a0cf0cb48c85134cd98295753f1a03de9b88 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202202101
+#define CATALOG_VERSION_NO     202202141
 
 #endif
index 90b43a4ecc98ba31ee02e611af710823b873a84b..76adbd4aaded82c61fe40e76b935634a1e0c3936 100644 (file)
@@ -65,6 +65,9 @@ CATALOG(pg_database,1262,DatabaseRelationId) BKI_SHARED_RELATION BKI_ROWTYPE_OID
        /* LC_CTYPE setting */
        text            datctype BKI_FORCE_NOT_NULL;
 
+       /* provider-dependent version of collation data */
+       text            datcollversion BKI_DEFAULT(_null_);
+
        /* access permissions */
        aclitem         datacl[1];
 #endif
index 62f36daa981bbdceb9ab9a9b9d3cabf55630873a..7f1ee97f55cc7708c3f54ed5207b48597ca5aa32 100644 (file)
   proname => 'pg_collation_actual_version', procost => '100',
   provolatile => 'v', prorettype => 'text', proargtypes => 'oid',
   prosrc => 'pg_collation_actual_version' },
+{ oid => '9167',
+  descr => 'get actual version of database collation from operating system',
+  proname => 'pg_database_collation_actual_version', procost => '100',
+  provolatile => 'v', prorettype => 'text', proargtypes => 'oid',
+  prosrc => 'pg_database_collation_actual_version' },
 
 # system management/monitoring related functions
 { oid => '3353', descr => 'list files in the log directory',
index b1e8b5eb969f5b50a04828ab718f7e3fcd3ff700..c4947fa71f76bee2a169e5d318ae9d0498d6e4b6 100644 (file)
@@ -24,6 +24,7 @@ extern void dropdb(const char *dbname, bool missing_ok, bool force);
 extern void DropDatabase(ParseState *pstate, DropdbStmt *stmt);
 extern ObjectAddress RenameDatabase(const char *oldname, const char *newname);
 extern Oid     AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel);
+extern ObjectAddress AlterDatabaseRefreshColl(AlterDatabaseRefreshCollStmt *stmt);
 extern Oid     AlterDatabaseSet(AlterDatabaseSetStmt *stmt);
 extern ObjectAddress AlterDatabaseOwner(const char *dbname, Oid newOwnerId);
 
index da35f2c27228a1d07986dce8951209d9bf7a73fb..5d075f0c346b59d47c250d3883982bb6670628e6 100644 (file)
@@ -370,6 +370,7 @@ typedef enum NodeTag
        T_CheckPointStmt,
        T_CreateSchemaStmt,
        T_AlterDatabaseStmt,
+       T_AlterDatabaseRefreshCollStmt,
        T_AlterDatabaseSetStmt,
        T_AlterRoleSetStmt,
        T_CreateConversionStmt,
index 37fcc4c9b5a93707287dffffd449e782edec85ca..34218b718c1dd705dd5261e7e5e2449f5c953110 100644 (file)
@@ -3308,6 +3308,12 @@ typedef struct AlterDatabaseStmt
        List       *options;            /* List of DefElem nodes */
 } AlterDatabaseStmt;
 
+typedef struct AlterDatabaseRefreshCollStmt
+{
+       NodeTag         type;
+       char       *dbname;
+} AlterDatabaseRefreshCollStmt;
+
 typedef struct AlterDatabaseSetStmt
 {
        NodeTag         type;
index 70133df8042cca63f51f3e300dffbf439f8f9412..9699ca16cfc72728aab39f81b720148680bab07b 100644 (file)
@@ -1085,6 +1085,10 @@ DROP ROLE regress_test_role;
 -- ALTER
 ALTER COLLATION "en-x-icu" REFRESH VERSION;
 NOTICE:  version has not changed
+-- also test for database while we are here
+SELECT current_database() AS datname \gset
+ALTER DATABASE :"datname" REFRESH COLLATION VERSION;
+NOTICE:  version has not changed
 -- dependencies
 CREATE COLLATION test0 FROM "C";
 CREATE TABLE collate_dep_test1 (a int, b text COLLATE test0);
index f06ae543e497713787cccd057080c1ee5837cf30..f2d0eb94f2470a2baedb710bf25776a972d7406f 100644 (file)
@@ -1096,6 +1096,10 @@ DROP ROLE regress_test_role;
 -- ALTER
 ALTER COLLATION "en_US" REFRESH VERSION;
 NOTICE:  version has not changed
+-- also test for database while we are here
+SELECT current_database() AS datname \gset
+ALTER DATABASE :"datname" REFRESH COLLATION VERSION;
+NOTICE:  version has not changed
 -- dependencies
 CREATE COLLATION test0 FROM "C";
 CREATE TABLE collate_dep_test1 (a int, b text COLLATE test0);
index 9cee3d0042b1ca88bb79aa3318c414b3196fe340..242a7ce6b7ca38295211a00757d60b423426f2d0 100644 (file)
@@ -409,6 +409,10 @@ DROP ROLE regress_test_role;
 
 ALTER COLLATION "en-x-icu" REFRESH VERSION;
 
+-- also test for database while we are here
+SELECT current_database() AS datname \gset
+ALTER DATABASE :"datname" REFRESH COLLATION VERSION;
+
 
 -- dependencies
 
index cbbd2203e413b0141242224e444dd353a92a56fe..0f6dd1b02e2fe457aa8cd31d81206e6a8b9f25f1 100644 (file)
@@ -410,6 +410,10 @@ DROP ROLE regress_test_role;
 
 ALTER COLLATION "en_US" REFRESH VERSION;
 
+-- also test for database while we are here
+SELECT current_database() AS datname \gset
+ALTER DATABASE :"datname" REFRESH COLLATION VERSION;
+
 
 -- dependencies