Rearrange dblink's dblink_build_sql_insert() and related routines to open and
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jun 2010 20:49:51 +0000 (20:49 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 14 Jun 2010 20:49:51 +0000 (20:49 +0000)
lock the target relation just once per SQL function call.  The original coding
obtained and released lock several times per call.  Aside from saving a
not-insignificant number of cycles, this eliminates possible race conditions
if someone tries to modify the relation's schema concurrently.  Also
centralize locking and permission-checking logic.

Problem noted while investigating a trouble report from Robert Voinea --- his
problem is still to be fixed, though.

contrib/dblink/dblink.c

index e5db663ec434f9205107ff3376b9e617d29ba91f..5c3b59105fe8db9e1a8a15c73af4915d7af93ea2 100644 (file)
@@ -8,7 +8,7 @@
  * Darko Prenosil <Darko.Prenosil@finteh.hr>
  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
  *
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.60.2.7 2010/06/09 00:59:54 itagaki Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.60.2.8 2010/06/14 20:49:51 tgl Exp $
  * Copyright (c) 2001-2006, PostgreSQL Global Development Group
  * ALL RIGHTS RESERVED;
  *
@@ -52,6 +52,7 @@
 #include "parser/parse_type.h"
 #include "parser/scansup.h"
 #include "tcop/tcopprot.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/dynahash.h"
@@ -80,20 +81,20 @@ static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn * rconn);
 static void deleteConnection(const char *name);
-static char **get_pkey_attnames(Oid relid, int16 *numatts);
+static char **get_pkey_attnames(Relation rel, int16 *numatts);
 static char **get_text_array_contents(ArrayType *array, int *numitems);
-static char *get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
-static char *get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
-static char *get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+static char *get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+static char *get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *quote_literal_cstr(char *rawstr);
 static char *quote_ident_cstr(char *rawstr);
 static int16 get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key);
-static HeapTuple get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
-static Oid get_relid_from_relname(text *relname_text);
-static char *generate_relation_name(Oid relid);
+static HeapTuple get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals);
+static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
+static char *generate_relation_name(Relation rel);
 static char *connstr_strip_password(const char *connstr);
 static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
-static int get_nondropped_natts(Oid relid);
+static int get_nondropped_natts(Relation rel);
 
 /* Global */
 static remoteConn *pconn = NULL;
@@ -1249,7 +1250,6 @@ Datum
 dblink_get_pkey(PG_FUNCTION_ARGS)
 {
    int16       numatts;
-   Oid         relid;
    char      **results;
    FuncCallContext *funcctx;
    int32       call_cntr;
@@ -1260,7 +1260,8 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
    /* stuff done only on the first call of the function */
    if (SRF_IS_FIRSTCALL())
    {
-       TupleDesc   tupdesc = NULL;
+       Relation    rel;
+       TupleDesc   tupdesc;
 
        /* create a function context for cross-call persistence */
        funcctx = SRF_FIRSTCALL_INIT();
@@ -1270,13 +1271,13 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
         */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
-       /* convert relname to rel Oid */
-       relid = get_relid_from_relname(PG_GETARG_TEXT_P(0));
-       if (!OidIsValid(relid))
-           ereport(ERROR,
-                   (errcode(ERRCODE_UNDEFINED_TABLE),
-                    errmsg("relation \"%s\" does not exist",
-                           GET_STR(PG_GETARG_TEXT_P(0)))));
+       /* open target relation */
+       rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
+
+       /* get the array of attnums */
+       results = get_pkey_attnames(rel, &numatts);
+
+       relation_close(rel, AccessShareLock);
 
        /*
         * need a tuple descriptor representing one INT and one TEXT column
@@ -1294,9 +1295,6 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
        attinmeta = TupleDescGetAttInMetadata(tupdesc);
        funcctx->attinmeta = attinmeta;
 
-       /* get an array of attnums */
-       results = get_pkey_attnames(relid, &numatts);
-
        if ((results != NULL) && (numatts > 0))
        {
            funcctx->max_calls = numatts;
@@ -1383,7 +1381,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
    int32       pknumatts_tmp = PG_GETARG_INT32(2);
    ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
-   Oid         relid;
+   Relation    rel;
    int16       pknumatts = 0;
    char      **src_pkattvals;
    char      **tgt_pkattvals;
@@ -1393,14 +1391,9 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
    int         nondropped_natts;
 
    /*
-    * Convert relname to rel OID.
+    * Open target relation.
     */
-   relid = get_relid_from_relname(relname_text);
-   if (!OidIsValid(relid))
-       ereport(ERROR,
-               (errcode(ERRCODE_UNDEFINED_TABLE),
-                errmsg("relation \"%s\" does not exist",
-                       GET_STR(relname_text))));
+   rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
    /*
     * There should be at least one key attribute
@@ -1422,7 +1415,7 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
     * ensure we don't ask for more pk attributes than we have
     * non-dropped columns
     */
-   nondropped_natts = get_nondropped_natts(relid);
+   nondropped_natts = get_nondropped_natts(rel);
    if (pknumatts > nondropped_natts)
        ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1460,7 +1453,12 @@ dblink_build_sql_insert(PG_FUNCTION_ARGS)
    /*
     * Prep work is finally done. Go get the SQL string.
     */
-   sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+   sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+   /*
+    * Now we can close the relation.
+    */
+   relation_close(rel, AccessShareLock);
 
    /*
     * And send it
@@ -1493,21 +1491,16 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
    int32       pknumatts_tmp = PG_GETARG_INT32(2);
    ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    int         nondropped_natts;
-   Oid         relid;
+   Relation    rel;
    int16       pknumatts = 0;
    char      **tgt_pkattvals;
    int         tgt_nitems;
    char       *sql;
 
    /*
-    * Convert relname to rel OID.
+    * Open target relation.
     */
-   relid = get_relid_from_relname(relname_text);
-   if (!OidIsValid(relid))
-       ereport(ERROR,
-               (errcode(ERRCODE_UNDEFINED_TABLE),
-                errmsg("relation \"%s\" does not exist",
-                       GET_STR(relname_text))));
+   rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
    /*
     * There should be at least one key attribute
@@ -1529,7 +1522,7 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
     * ensure we don't ask for more pk attributes than we have
     * non-dropped columns
     */
-   nondropped_natts = get_nondropped_natts(relid);
+   nondropped_natts = get_nondropped_natts(rel);
    if (pknumatts > nondropped_natts)
        ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1552,7 +1545,12 @@ dblink_build_sql_delete(PG_FUNCTION_ARGS)
    /*
     * Prep work is finally done. Go get the SQL string.
     */
-   sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+   sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
+
+   /*
+    * Now we can close the relation.
+    */
+   relation_close(rel, AccessShareLock);
 
    /*
     * And send it
@@ -1590,7 +1588,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    int         nondropped_natts;
-   Oid         relid;
+   Relation    rel;
    int16       pknumatts = 0;
    char      **src_pkattvals;
    char      **tgt_pkattvals;
@@ -1599,14 +1597,9 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    char       *sql;
 
    /*
-    * Convert relname to rel OID.
+    * Open target relation.
     */
-   relid = get_relid_from_relname(relname_text);
-   if (!OidIsValid(relid))
-       ereport(ERROR,
-               (errcode(ERRCODE_UNDEFINED_TABLE),
-                errmsg("relation \"%s\" does not exist",
-                       GET_STR(relname_text))));
+   rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
 
    /*
     * There should be one source array key values for each key attnum
@@ -1628,7 +1621,7 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
     * ensure we don't ask for more pk attributes than we have
     * non-dropped columns
     */
-   nondropped_natts = get_nondropped_natts(relid);
+   nondropped_natts = get_nondropped_natts(rel);
    if (pknumatts > nondropped_natts)
        ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
                errmsg("number of primary key fields exceeds number of specified relation attributes")));
@@ -1666,7 +1659,12 @@ dblink_build_sql_update(PG_FUNCTION_ARGS)
    /*
     * Prep work is finally done. Go get the SQL string.
     */
-   sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+   sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+   /*
+    * Now we can close the relation.
+    */
+   relation_close(rel, AccessShareLock);
 
    /*
     * And send it
@@ -1703,7 +1701,7 @@ dblink_current_query(PG_FUNCTION_ARGS)
  * Return NULL, and set numatts = 0, if no primary key exists.
  */
 static char **
-get_pkey_attnames(Oid relid, int16 *numatts)
+get_pkey_attnames(Relation rel, int16 *numatts)
 {
    Relation    indexRelation;
    ScanKeyData entry;
@@ -1711,22 +1709,19 @@ get_pkey_attnames(Oid relid, int16 *numatts)
    HeapTuple   indexTuple;
    int         i;
    char      **result = NULL;
-   Relation    rel;
    TupleDesc   tupdesc;
 
-   /* open relation using relid, get tupdesc */
-   rel = relation_open(relid, AccessShareLock);
-   tupdesc = rel->rd_att;
-
    /* initialize numatts to 0 in case no primary key exists */
    *numatts = 0;
 
+   tupdesc = rel->rd_att;
+
    /* use relid to get all related indexes */
    indexRelation = heap_open(IndexRelationId, AccessShareLock);
    ScanKeyInit(&entry,
                Anum_pg_index_indrelid,
                BTEqualStrategyNumber, F_OIDEQ,
-               ObjectIdGetDatum(relid));
+               ObjectIdGetDatum(RelationGetRelid(rel)));
    scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
 
    while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
@@ -1749,7 +1744,6 @@ get_pkey_attnames(Oid relid, int16 *numatts)
    }
    heap_endscan(scan);
    heap_close(indexRelation, AccessShareLock);
-   relation_close(rel, AccessShareLock);
 
    return result;
 }
@@ -1816,9 +1810,8 @@ get_text_array_contents(ArrayType *array, int *numitems)
 }
 
 static char *
-get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_insert(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
-   Relation    rel;
    char       *relname;
    HeapTuple   tuple;
    TupleDesc   tupdesc;
@@ -1832,16 +1825,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
    initStringInfo(&buf);
 
    /* get relation name including any needed schema prefix and quoting */
-   relname = generate_relation_name(relid);
+   relname = generate_relation_name(rel);
 
-   /*
-    * Open relation using relid
-    */
-   rel = relation_open(relid, AccessShareLock);
    tupdesc = rel->rd_att;
    natts = tupdesc->natts;
 
-   tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+   tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    if (!tuple)
        ereport(ERROR,
                (errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -1898,14 +1887,12 @@ get_sql_insert(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
    }
    appendStringInfo(&buf, ")");
 
-   relation_close(rel, AccessShareLock);
    return (buf.data);
 }
 
 static char *
-get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+get_sql_delete(Relation rel, int2vector *pkattnums, int16 pknumatts, char **tgt_pkattvals)
 {
-   Relation    rel;
    char       *relname;
    TupleDesc   tupdesc;
    int         natts;
@@ -1915,12 +1902,8 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
    initStringInfo(&buf);
 
    /* get relation name including any needed schema prefix and quoting */
-   relname = generate_relation_name(relid);
+   relname = generate_relation_name(rel);
 
-   /*
-    * Open relation using relid
-    */
-   rel = relation_open(relid, AccessShareLock);
    tupdesc = rel->rd_att;
    natts = tupdesc->natts;
 
@@ -1946,14 +1929,12 @@ get_sql_delete(Oid relid, int2vector *pkattnums, int16 pknumatts, char **tgt_pka
            appendStringInfo(&buf, " IS NULL");
    }
 
-   relation_close(rel, AccessShareLock);
    return (buf.data);
 }
 
 static char *
-get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+get_sql_update(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
 {
-   Relation    rel;
    char       *relname;
    HeapTuple   tuple;
    TupleDesc   tupdesc;
@@ -1967,16 +1948,12 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
    initStringInfo(&buf);
 
    /* get relation name including any needed schema prefix and quoting */
-   relname = generate_relation_name(relid);
+   relname = generate_relation_name(rel);
 
-   /*
-    * Open relation using relid
-    */
-   rel = relation_open(relid, AccessShareLock);
    tupdesc = rel->rd_att;
    natts = tupdesc->natts;
 
-   tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+   tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    if (!tuple)
        ereport(ERROR,
                (errcode(ERRCODE_CARDINALITY_VIOLATION),
@@ -2042,7 +2019,6 @@ get_sql_update(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pka
            appendStringInfo(&buf, " IS NULL");
    }
 
-   relation_close(rel, AccessShareLock);
    return (buf.data);
 }
 
@@ -2098,9 +2074,8 @@ get_attnum_pk_pos(int2vector *pkattnums, int16 pknumatts, int16 key)
 }
 
 static HeapTuple
-get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
+get_tuple_of_interest(Relation rel, int2vector *pkattnums, int16 pknumatts, char **src_pkattvals)
 {
-   Relation    rel;
    char       *relname;
    TupleDesc   tupdesc;
    StringInfoData buf;
@@ -2111,14 +2086,9 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
    initStringInfo(&buf);
 
    /* get relation name including any needed schema prefix and quoting */
-   relname = generate_relation_name(relid);
+   relname = generate_relation_name(rel);
 
-   /*
-    * Open relation using relid
-    */
-   rel = relation_open(relid, AccessShareLock);
-   tupdesc = CreateTupleDescCopy(rel->rd_att);
-   relation_close(rel, AccessShareLock);
+   tupdesc = rel->rd_att;
 
    /*
     * Connect to SPI manager
@@ -2189,52 +2159,49 @@ get_tuple_of_interest(Oid relid, int2vector *pkattnums, int16 pknumatts, char **
    return NULL;
 }
 
-static Oid
-get_relid_from_relname(text *relname_text)
+/*
+ * Open the relation named by relname_text, acquire specified type of lock,
+ * verify we have specified permissions.
+ * Caller must close rel when done with it.
+ */
+static Relation
+get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
 {
    RangeVar   *relvar;
    Relation    rel;
-   Oid         relid;
+   AclResult   aclresult;
 
    relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
-   rel = heap_openrv(relvar, AccessShareLock);
-   relid = RelationGetRelid(rel);
-   relation_close(rel, AccessShareLock);
+   rel = heap_openrv(relvar, lockmode);
 
-   return relid;
+   aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
+                                 aclmode);
+   if (aclresult != ACLCHECK_OK)
+       aclcheck_error(aclresult, ACL_KIND_CLASS,
+                      RelationGetRelationName(rel));
+
+   return rel;
 }
 
 /*
  * generate_relation_name - copied from ruleutils.c
- *     Compute the name to display for a relation specified by OID
+ *     Compute the name to display for a relation
  *
  * The result includes all necessary quoting and schema-prefixing.
  */
 static char *
-generate_relation_name(Oid relid)
+generate_relation_name(Relation rel)
 {
-   HeapTuple   tp;
-   Form_pg_class reltup;
    char       *nspname;
    char       *result;
 
-   tp = SearchSysCache(RELOID,
-                       ObjectIdGetDatum(relid),
-                       0, 0, 0);
-   if (!HeapTupleIsValid(tp))
-       elog(ERROR, "cache lookup failed for relation %u", relid);
-
-   reltup = (Form_pg_class) GETSTRUCT(tp);
-
    /* Qualify the name if not visible in search path */
-   if (RelationIsVisible(relid))
+   if (RelationIsVisible(RelationGetRelid(rel)))
        nspname = NULL;
    else
-       nspname = get_namespace_name(reltup->relnamespace);
+       nspname = get_namespace_name(rel->rd_rel->relnamespace);
 
-   result = quote_qualified_identifier(nspname, NameStr(reltup->relname));
-
-   ReleaseSysCache(tp);
+   result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
 
    return result;
 }
@@ -2479,15 +2446,13 @@ dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
 }
 
 static int
-get_nondropped_natts(Oid relid)
+get_nondropped_natts(Relation rel)
 {
    int         nondropped_natts = 0;
    TupleDesc   tupdesc;
-   Relation    rel;
    int         natts;
    int         i;
 
-   rel = relation_open(relid, AccessShareLock);
    tupdesc = rel->rd_att;
    natts = tupdesc->natts;
 
@@ -2498,7 +2463,6 @@ get_nondropped_natts(Oid relid)
        nondropped_natts++;
    }
 
-   relation_close(rel, AccessShareLock);
    return nondropped_natts;
 }