Check for tables with sql_identifier during pg_upgrade
authorTomas Vondra <tomas.vondra@postgresql.org>
Mon, 14 Oct 2019 20:31:56 +0000 (22:31 +0200)
committerTomas Vondra <tomas.vondra@postgresql.org>
Mon, 14 Oct 2019 20:31:56 +0000 (22:31 +0200)
Commit 7c15cef86d changed sql_identifier data type to be based on name
instead of varchar.  Unfortunately, this breaks on-disk format for this
data type.  Luckily, that should be a very rare problem, as this data
type is used only in information_schema views, so this only affects user
objects (tables, materialized views and indexes).  One way to end in
such situation is to do CTAS with a query on those system views.

There are two options to deal with this - we can either abort pg_upgrade
if there are user objects with sql_identifier columns in pg_upgrade, or
we could replace the sql_identifier type with varchar.  Considering how
rare the issue is expected to be, and the complexity of replacing the
data type (e.g. in matviews), we've decided to go with the simple check.

The query is somewhat complex - the sql_identifier data type may be used
indirectly - through a domain, a composite type or both, possibly in
multiple levels.  Detecting this requires a recursive CTE.

Backpatch to 12, where the sql_identifier definition changed.

Reported-by: Hans Buschmann
Author: Tomas Vondra
Reviewed-by: Tom Lane
Backpatch-to: 12
Discussion: https://postgr.es/m/16045-673e8fa6b5ace196%40postgresql.org

src/bin/pg_upgrade/check.c
src/bin/pg_upgrade/pg_upgrade.h
src/bin/pg_upgrade/version.c

index e7bf48a7394dc0afed9cfbe8e41b932b95cea00a..87b9d328d4cc72dbc4091e1833afb7297db405b8 100644 (file)
@@ -108,6 +108,14 @@ check_and_dump_old_cluster(bool live_check)
    if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1100)
        check_for_tables_with_oids(&old_cluster);
 
+   /*
+    * PG 12 changed the 'sql_identifier' type storage to be based on name,
+    * not varchar, which breaks on-disk format for existing data. So we need
+    * to prevent upgrade when used in user objects (tables, indexes, ...).
+    */
+   if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1100)
+       old_11_check_for_sql_identifier_data_type_usage(&old_cluster);
+
    /*
     * Pre-PG 10 allowed tables with 'unknown' type columns and non WAL logged
     * hash indexes
index f724ecf9caddb28e08982a29636d7b8fa13332c8..729f86aa32d443a4f873d983adf77e3e4d878dfb 100644 (file)
@@ -451,6 +451,8 @@ void        old_9_6_check_for_unknown_data_type_usage(ClusterInfo *cluster);
 void       old_9_6_invalidate_hash_indexes(ClusterInfo *cluster,
                                            bool check_mode);
 
+void       old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster);
+
 /* parallel.c */
 void       parallel_exec_prog(const char *log_file, const char *opt_log_file,
                               const char *fmt,...) pg_attribute_printf(3, 4);
index 42f1ce77571878e5bf1ddcc67e139283f7e1a9db..f3fd8fa27378f275fcc7533e31a525ee7b646aea 100644 (file)
@@ -399,3 +399,122 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
    else
        check_ok();
 }
+
+/*
+ * old_11_check_for_sql_identifier_data_type_usage()
+ * 11 -> 12
+ * In 12, the sql_identifier data type was switched from name to varchar,
+ * which does affect the storage (name is by-ref, but not varlena). This
+ * means user tables using sql_identifier for columns are broken because
+ * the on-disk format is different.
+ *
+ * We need to check all objects that might store sql_identifier on disk,
+ * i.e. tables, matviews and indexes. Also check composite types in case
+ * they are used in this context.
+ */
+void
+old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster)
+{
+   int         dbnum;
+   FILE       *script = NULL;
+   bool        found = false;
+   char        output_path[MAXPGPATH];
+
+   prep_status("Checking for invalid \"sql_identifier\" user columns");
+
+   snprintf(output_path, sizeof(output_path), "tables_using_sql_identifier.txt");
+
+   for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+   {
+       PGresult   *res;
+       bool        db_used = false;
+       int         ntups;
+       int         rowno;
+       int         i_nspname,
+                   i_relname,
+                   i_attname;
+       DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
+       PGconn     *conn = connectToServer(cluster, active_db->db_name);
+
+       /*
+        * We need the recursive CTE because the sql_identifier may be wrapped
+        * either in a domain or composite type, or both (in arbitrary order).
+        */
+       res = executeQueryOrDie(conn,
+                               "WITH RECURSIVE oids AS ( "
+       /* the sql_identifier type itself */
+                               "   SELECT 'information_schema.sql_identifier'::regtype AS oid "
+                               "   UNION ALL "
+                               "   SELECT * FROM ( "
+       /* domains on the type */
+                               "       WITH x AS (SELECT oid FROM oids) "
+                               "           SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+                               "           UNION "
+       /* composite types containing the type */
+                               "           SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+                               "           WHERE t.typtype = 'c' AND "
+                               "                 t.oid = c.reltype AND "
+                               "                 c.oid = a.attrelid AND "
+                               "                 NOT a.attisdropped AND "
+                               "                 a.atttypid = x.oid "
+                               "   ) foo "
+                               ") "
+                               "SELECT n.nspname, c.relname, a.attname "
+                               "FROM   pg_catalog.pg_class c, "
+                               "       pg_catalog.pg_namespace n, "
+                               "       pg_catalog.pg_attribute a "
+                               "WHERE  c.oid = a.attrelid AND "
+                               "       NOT a.attisdropped AND "
+                               "       a.atttypid IN (SELECT oid FROM oids) AND "
+                               "       c.relkind IN ("
+                               CppAsString2(RELKIND_RELATION) ", "
+                               CppAsString2(RELKIND_MATVIEW) ", "
+                               CppAsString2(RELKIND_INDEX) ") AND "
+                               "       c.relnamespace = n.oid AND "
+       /* exclude possible orphaned temp tables */
+                               "       n.nspname !~ '^pg_temp_' AND "
+                               "       n.nspname !~ '^pg_toast_temp_' AND "
+                               "       n.nspname NOT IN ('pg_catalog', 'information_schema')");
+
+       ntups = PQntuples(res);
+       i_nspname = PQfnumber(res, "nspname");
+       i_relname = PQfnumber(res, "relname");
+       i_attname = PQfnumber(res, "attname");
+       for (rowno = 0; rowno < ntups; rowno++)
+       {
+           found = true;
+           if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
+               pg_fatal("could not open file \"%s\": %s\n", output_path,
+                        strerror(errno));
+           if (!db_used)
+           {
+               fprintf(script, "Database: %s\n", active_db->db_name);
+               db_used = true;
+           }
+           fprintf(script, "  %s.%s.%s\n",
+                   PQgetvalue(res, rowno, i_nspname),
+                   PQgetvalue(res, rowno, i_relname),
+                   PQgetvalue(res, rowno, i_attname));
+       }
+
+       PQclear(res);
+
+       PQfinish(conn);
+   }
+
+   if (script)
+       fclose(script);
+
+   if (found)
+   {
+       pg_log(PG_REPORT, "fatal\n");
+       pg_fatal("Your installation contains the \"sql_identifier\" data type in user tables\n"
+                "and/or indexes.  The on-disk format for this data type has changed, so this\n"
+                "cluster cannot currently be upgraded.  You can remove the problem tables or\n"
+                "change the data type to \"name\" and restart the upgrade.\n"
+                "A list of the problem columns is in the file:\n"
+                "    %s\n\n", output_path);
+   }
+   else
+       check_ok();
+}