diff options
Diffstat (limited to 'src/backend')
43 files changed, 1813 insertions, 525 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index cb1e6893053..8e595c18cb8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.168 2004/05/27 17:12:37 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.169 2004/06/18 06:13:09 tgl Exp $ * * * INTERFACE ROUTINES @@ -2630,8 +2630,8 @@ heap_undo(XLogRecPtr lsn, XLogRecord *record) static void out_target(char *buf, xl_heaptid *target) { - sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", - target->node.tblNode, target->node.relNode, + sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u", + target->node.spcNode, target->node.dbNode, target->node.relNode, ItemPointerGetBlockNumber(&(target->tid)), ItemPointerGetOffsetNumber(&(target->tid))); } @@ -2673,8 +2673,9 @@ heap_desc(char *buf, uint8 xl_info, char *rec) { xl_heap_clean *xlrec = (xl_heap_clean *) rec; - sprintf(buf + strlen(buf), "clean: node %u/%u; blk %u", - xlrec->node.tblNode, xlrec->node.relNode, xlrec->block); + sprintf(buf + strlen(buf), "clean: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); } else strcat(buf, "UNKNOWN"); diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index ed398b32da5..3cb2f3836d2 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.13 2004/06/02 17:28:17 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.14 2004/06/18 06:13:11 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -803,8 +803,8 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record) static void out_target(char *buf, xl_btreetid *target) { - sprintf(buf + strlen(buf), "node %u/%u; tid %u/%u", - target->node.tblNode, target->node.relNode, + sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u", + target->node.spcNode, target->node.dbNode, target->node.relNode, ItemPointerGetBlockNumber(&(target->tid)), ItemPointerGetOffsetNumber(&(target->tid))); } @@ -884,8 +884,9 @@ btree_desc(char *buf, uint8 xl_info, char *rec) { xl_btree_delete *xlrec = (xl_btree_delete *) rec; - sprintf(buf + strlen(buf), "delete: node %u/%u; blk %u", - xlrec->node.tblNode, xlrec->node.relNode, xlrec->block); + sprintf(buf + strlen(buf), "delete: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); break; } case XLOG_BTREE_DELETE_PAGE: @@ -903,8 +904,9 @@ btree_desc(char *buf, uint8 xl_info, char *rec) { xl_btree_newroot *xlrec = (xl_btree_newroot *) rec; - sprintf(buf + strlen(buf), "newroot: node %u/%u; root %u lev %u", - xlrec->node.tblNode, xlrec->node.relNode, + sprintf(buf + strlen(buf), "newroot: rel %u/%u/%u; root %u lev %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->rootblk, xlrec->level); break; } @@ -912,8 +914,9 @@ btree_desc(char *buf, uint8 xl_info, char *rec) { xl_btree_newmeta *xlrec = (xl_btree_newmeta *) rec; - sprintf(buf + strlen(buf), "newmeta: node %u/%u; root %u lev %u fast %u lev %u", - xlrec->node.tblNode, xlrec->node.relNode, + sprintf(buf + strlen(buf), "newmeta: rel %u/%u/%u; root %u lev %u fast %u lev %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->meta.root, xlrec->meta.level, xlrec->meta.fastroot, xlrec->meta.fastlevel); break; @@ -922,9 +925,9 @@ btree_desc(char *buf, uint8 xl_info, char *rec) { xl_btree_newpage *xlrec = (xl_btree_newpage *) rec; - sprintf(buf + strlen(buf), "newpage: node %u/%u; page %u", - xlrec->node.tblNode, xlrec->node.relNode, - xlrec->blkno); + sprintf(buf + strlen(buf), "newpage: rel %u/%u/%u; page %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->blkno); break; } default: diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index a7c8d3bf52b..a58902e76f6 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.30 2004/02/11 22:55:24 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.31 2004/06/18 06:13:15 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -354,7 +354,7 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode) * though, since we are presumably running by ourselves and can't * have any lock conflicts ... */ - res->reldata.rd_lockInfo.lockRelId.dbId = rnode.tblNode; + res->reldata.rd_lockInfo.lockRelId.dbId = rnode.dbNode; res->reldata.rd_lockInfo.lockRelId.relId = rnode.relNode; hentry = (XLogRelCacheEntry *) diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y index cbc8ff6e539..7b712bd555b 100644 --- a/src/backend/bootstrap/bootparse.y +++ b/src/backend/bootstrap/bootparse.y @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.69 2004/06/03 02:08:02 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/bootstrap/bootparse.y,v 1.70 2004/06/18 06:13:17 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -31,6 +31,7 @@ #include "catalog/pg_attribute.h" #include "catalog/pg_class.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_tablespace.h" #include "commands/defrem.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -181,6 +182,7 @@ Boot_CreateStmt: boot_reldesc = heap_create(LexIDStr($5), PG_CATALOG_NAMESPACE, + $3 ? GLOBALTABLESPACE_OID : 0, tupdesc, $3, true, @@ -193,6 +195,7 @@ Boot_CreateStmt: id = heap_create_with_catalog(LexIDStr($5), PG_CATALOG_NAMESPACE, + $3 ? GLOBALTABLESPACE_OID : 0, tupdesc, RELKIND_RELATION, $3, @@ -239,6 +242,7 @@ Boot_DeclareIndexStmt: DefineIndex(makeRangeVar(NULL, LexIDStr($5)), LexIDStr($3), LexIDStr($7), + NULL, $9, NULL, NIL, false, false, false, @@ -255,6 +259,7 @@ Boot_DeclareUniqueIndexStmt: DefineIndex(makeRangeVar(NULL, LexIDStr($6)), LexIDStr($4), LexIDStr($8), + NULL, $10, NULL, NIL, true, false, false, diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 0a68cb661af..c63168a9bd5 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -2,7 +2,7 @@ # # Makefile for backend/catalog # -# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.50 2004/01/04 05:57:21 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/catalog/Makefile,v 1.51 2004/06/18 06:13:19 tgl Exp $ # #------------------------------------------------------------------------- @@ -32,7 +32,7 @@ POSTGRES_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_language.h pg_largeobject.h pg_aggregate.h pg_statistic.h \ pg_rewrite.h pg_trigger.h pg_listener.h pg_description.h pg_cast.h \ pg_namespace.h pg_conversion.h pg_database.h pg_shadow.h pg_group.h \ - pg_depend.h indexing.h \ + pg_tablespace.h pg_depend.h indexing.h \ ) pg_includes := $(sort -I$(top_srcdir)/src/include -I$(top_builddir)/src/include) @@ -59,5 +59,5 @@ installdirs: uninstall-data: rm -f $(addprefix $(DESTDIR)$(datadir)/, $(BKIFILES) system_views.sql information_schema.sql sql_features.txt) -clean: +clean: rm -f SUBSYS.o $(OBJS) $(BKIFILES) diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index de74a422b78..6c966b89b27 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.103 2004/06/01 21:49:22 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/aclchk.c,v 1.104 2004/06/18 06:13:19 tgl Exp $ * * NOTES * See acl.h. @@ -31,6 +31,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_shadow.h" +#include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "miscadmin.h" #include "parser/parse_func.h" @@ -45,6 +46,7 @@ static void ExecuteGrantStmt_Database(GrantStmt *stmt); static void ExecuteGrantStmt_Function(GrantStmt *stmt); static void ExecuteGrantStmt_Language(GrantStmt *stmt); static void ExecuteGrantStmt_Namespace(GrantStmt *stmt); +static void ExecuteGrantStmt_Tablespace(GrantStmt *stmt); static const char *privilege_to_string(AclMode privilege); @@ -207,12 +209,16 @@ ExecuteGrantStmt(GrantStmt *stmt) case ACL_OBJECT_NAMESPACE: ExecuteGrantStmt_Namespace(stmt); break; + case ACL_OBJECT_TABLESPACE: + ExecuteGrantStmt_Tablespace(stmt); + break; default: elog(ERROR, "unrecognized GrantStmt.objtype: %d", (int) stmt->objtype); } } + static void ExecuteGrantStmt_Relation(GrantStmt *stmt) { @@ -1009,6 +1015,163 @@ ExecuteGrantStmt_Namespace(GrantStmt *stmt) } } +static void +ExecuteGrantStmt_Tablespace(GrantStmt *stmt) +{ + AclMode privileges; + bool all_privs; + ListCell *i; + + if (linitial_int(stmt->privileges) == ACL_ALL_RIGHTS) + { + all_privs = true; + privileges = ACL_ALL_RIGHTS_TABLESPACE; + } + else + { + all_privs = false; + privileges = ACL_NO_RIGHTS; + foreach(i, stmt->privileges) + { + AclMode priv = lfirst_int(i); + + if (priv & ~((AclMode) ACL_ALL_RIGHTS_TABLESPACE)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_GRANT_OPERATION), + errmsg("invalid privilege type %s for tablespace", + privilege_to_string(priv)))); + privileges |= priv; + } + } + + foreach(i, stmt->objects) + { + char *spcname = strVal(lfirst(i)); + Relation relation; + ScanKeyData entry[1]; + HeapScanDesc scan; + HeapTuple tuple; + Form_pg_tablespace pg_tablespace_tuple; + Datum aclDatum; + bool isNull; + AclMode my_goptions; + AclMode this_privileges; + Acl *old_acl; + Acl *new_acl; + AclId grantorId; + AclId ownerId; + HeapTuple newtuple; + Datum values[Natts_pg_tablespace]; + char nulls[Natts_pg_tablespace]; + char replaces[Natts_pg_tablespace]; + + relation = heap_openr(TableSpaceRelationName, RowExclusiveLock); + ScanKeyInit(&entry[0], + Anum_pg_tablespace_spcname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(spcname)); + scan = heap_beginscan(relation, SnapshotNow, 1, entry); + tuple = heap_getnext(scan, ForwardScanDirection); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", spcname))); + pg_tablespace_tuple = (Form_pg_tablespace) GETSTRUCT(tuple); + + ownerId = pg_tablespace_tuple->spcowner; + grantorId = select_grantor(ownerId); + + /* + * Must be owner or have some privilege on the object (per spec, + * any privilege will get you by here). The owner is always + * treated as having all grant options. + */ + if (pg_tablespace_ownercheck(HeapTupleGetOid(tuple), GetUserId())) + my_goptions = ACL_ALL_RIGHTS_TABLESPACE; + else + { + AclMode my_rights; + + my_rights = pg_tablespace_aclmask(HeapTupleGetOid(tuple), + GetUserId(), + ACL_ALL_RIGHTS_TABLESPACE | ACL_GRANT_OPTION_FOR(ACL_ALL_RIGHTS_TABLESPACE), + ACLMASK_ALL); + if (my_rights == ACL_NO_RIGHTS) + aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, + spcname); + my_goptions = ACL_OPTION_TO_PRIVS(my_rights); + } + + /* + * Restrict the operation to what we can actually grant or revoke, + * and issue a warning if appropriate. (For REVOKE this isn't quite + * what the spec says to do: the spec seems to want a warning only + * if no privilege bits actually change in the ACL. In practice + * that behavior seems much too noisy, as well as inconsistent with + * the GRANT case.) + */ + this_privileges = privileges & my_goptions; + if (stmt->is_grant) + { + if (this_privileges == 0) + ereport(WARNING, + (errcode(ERRCODE_WARNING_PRIVILEGE_NOT_GRANTED), + errmsg("no privileges were granted"))); + else if (!all_privs && this_privileges != privileges) + ereport(WARNING, + (errcode(ERRCODE_WARNING_PRIVILEGE_NOT_GRANTED), + errmsg("not all privileges were granted"))); + } + else + { + if (this_privileges == 0) + ereport(WARNING, + (errcode(ERRCODE_WARNING_PRIVILEGE_NOT_REVOKED), + errmsg("no privileges could be revoked"))); + else if (!all_privs && this_privileges != privileges) + ereport(WARNING, + (errcode(ERRCODE_WARNING_PRIVILEGE_NOT_REVOKED), + errmsg("not all privileges could be revoked"))); + } + + /* + * If there's no ACL, substitute the proper default. + */ + aclDatum = heap_getattr(tuple, Anum_pg_tablespace_spcacl, + RelationGetDescr(relation), &isNull); + if (isNull) + old_acl = acldefault(ACL_OBJECT_TABLESPACE, ownerId); + else + /* get a detoasted copy of the ACL */ + old_acl = DatumGetAclPCopy(aclDatum); + + new_acl = merge_acl_with_grant(old_acl, stmt->is_grant, + stmt->grant_option, stmt->behavior, + stmt->grantees, this_privileges, + grantorId, ownerId); + + /* finished building new ACL value, now insert it */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, ' ', sizeof(nulls)); + MemSet(replaces, ' ', sizeof(replaces)); + + replaces[Anum_pg_tablespace_spcacl - 1] = 'r'; + values[Anum_pg_tablespace_spcacl - 1] = PointerGetDatum(new_acl); + + newtuple = heap_modifytuple(tuple, relation, values, nulls, replaces); + + simple_heap_update(relation, &newtuple->t_self, newtuple); + + /* keep the catalog indexes up to date */ + CatalogUpdateIndexes(relation, newtuple); + + pfree(new_acl); + + heap_endscan(scan); + heap_close(relation, RowExclusiveLock); + } +} + static const char * privilege_to_string(AclMode privilege) @@ -1112,7 +1275,9 @@ static const char *const no_priv_msg[MAX_ACL_KIND] = /* ACL_KIND_OPCLASS */ gettext_noop("permission denied for operator class %s"), /* ACL_KIND_CONVERSION */ - gettext_noop("permission denied for conversion %s") + gettext_noop("permission denied for conversion %s"), + /* ACL_KIND_TABLESPACE */ + gettext_noop("permission denied for tablespace %s") }; static const char *const not_owner_msg[MAX_ACL_KIND] = @@ -1134,7 +1299,9 @@ static const char *const not_owner_msg[MAX_ACL_KIND] = /* ACL_KIND_OPCLASS */ gettext_noop("must be owner of operator class %s"), /* ACL_KIND_CONVERSION */ - gettext_noop("must be owner of conversion %s") + gettext_noop("must be owner of conversion %s"), + /* ACL_KIND_TABLESPACE */ + gettext_noop("must be owner of tablespace %s") }; @@ -1545,6 +1712,80 @@ pg_namespace_aclmask(Oid nsp_oid, AclId userid, return result; } +/* + * Exported routine for examining a user's privileges for a tablespace + */ +AclMode +pg_tablespace_aclmask(Oid spc_oid, AclId userid, + AclMode mask, AclMaskHow how) +{ + AclMode result; + Relation pg_tablespace; + ScanKeyData entry[1]; + HeapScanDesc scan; + HeapTuple tuple; + Datum aclDatum; + bool isNull; + Acl *acl; + AclId ownerId; + + /* + * Only shared relations can be stored in global space; don't let + * even superusers override this + */ + if (spc_oid == GLOBALTABLESPACE_OID && !IsBootstrapProcessingMode()) + return 0; + + /* Otherwise, superusers bypass all permission checking. */ + if (superuser_arg(userid)) + return mask; + + /* + * Get the tablespace's ACL from pg_tablespace + * + * There's no syscache for pg_tablespace, so must look the hard way + */ + pg_tablespace = heap_openr(TableSpaceRelationName, AccessShareLock); + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(spc_oid)); + scan = heap_beginscan(pg_tablespace, SnapshotNow, 1, entry); + tuple = heap_getnext(scan, ForwardScanDirection); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace with OID %u does not exist", spc_oid))); + + ownerId = ((Form_pg_tablespace) GETSTRUCT(tuple))->spcowner; + + aclDatum = heap_getattr(tuple, Anum_pg_tablespace_spcacl, + RelationGetDescr(pg_tablespace), &isNull); + + if (isNull) + { + /* No ACL, so build default ACL */ + acl = acldefault(ACL_OBJECT_TABLESPACE, ownerId); + aclDatum = (Datum) 0; + } + else + { + /* detoast ACL if necessary */ + acl = DatumGetAclP(aclDatum); + } + + result = aclmask(acl, userid, ownerId, mask, how); + + /* if we have a detoasted copy, free it */ + if (acl && (Pointer) acl != DatumGetPointer(aclDatum)) + pfree(acl); + + heap_endscan(scan); + heap_close(pg_tablespace, AccessShareLock); + + return result; +} + /* * Exported routine for checking a user's access privileges to a table @@ -1610,6 +1851,18 @@ pg_namespace_aclcheck(Oid nsp_oid, AclId userid, AclMode mode) return ACLCHECK_NO_PRIV; } +/* + * Exported routine for checking a user's access privileges to a tablespace + */ +AclResult +pg_tablespace_aclcheck(Oid spc_oid, AclId userid, AclMode mode) +{ + if (pg_tablespace_aclmask(spc_oid, userid, mode, ACLMASK_ANY) != 0) + return ACLCHECK_OK; + else + return ACLCHECK_NO_PRIV; +} + /* * Ownership check for a relation (specified by OID). @@ -1752,6 +2005,45 @@ pg_namespace_ownercheck(Oid nsp_oid, AclId userid) } /* + * Ownership check for a tablespace (specified by OID). + */ +bool +pg_tablespace_ownercheck(Oid spc_oid, AclId userid) +{ + Relation pg_tablespace; + ScanKeyData entry[1]; + HeapScanDesc scan; + HeapTuple spctuple; + int32 spcowner; + + /* Superusers bypass all permission checking. */ + if (superuser_arg(userid)) + return true; + + /* There's no syscache for pg_tablespace, so must look the hard way */ + pg_tablespace = heap_openr(TableSpaceRelationName, AccessShareLock); + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(spc_oid)); + scan = heap_beginscan(pg_tablespace, SnapshotNow, 1, entry); + + spctuple = heap_getnext(scan, ForwardScanDirection); + + if (!HeapTupleIsValid(spctuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace with OID %u does not exist", spc_oid))); + + spcowner = ((Form_pg_tablespace) GETSTRUCT(spctuple))->spcowner; + + heap_endscan(scan); + heap_close(pg_tablespace, AccessShareLock); + + return userid == spcowner; +} + +/* * Ownership check for an operator class (specified by OID). */ bool @@ -1780,9 +2072,8 @@ pg_opclass_ownercheck(Oid opc_oid, AclId userid) return userid == owner_id; } - /* - * Ownership check for database (specified as OID) + * Ownership check for a database (specified by OID). */ bool pg_database_ownercheck(Oid db_oid, AclId userid) diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index afea81a976c..32d4d33d89a 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/catalog.c,v 1.51 2004/01/06 18:07:31 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/catalog.c,v 1.52 2004/06/18 06:13:19 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -20,30 +20,48 @@ #include "catalog/catalog.h" #include "catalog/catname.h" #include "catalog/pg_namespace.h" +#include "catalog/pg_tablespace.h" #include "miscadmin.h" +#define OIDCHARS 10 /* max chars printed by %u */ + + /* * relpath - construct path to a relation's file * * Result is a palloc'd string. */ - char * relpath(RelFileNode rnode) { + int pathlen; char *path; - if (rnode.tblNode == (Oid) 0) /* "global tablespace" */ + if (rnode.spcNode == GLOBALTABLESPACE_OID) { /* Shared system relations live in {datadir}/global */ - path = (char *) palloc(strlen(DataDir) + 8 + sizeof(NameData) + 1); - sprintf(path, "%s/global/%u", DataDir, rnode.relNode); + Assert(rnode.dbNode == 0); + pathlen = strlen(DataDir) + 8 + OIDCHARS + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/global/%u", + DataDir, rnode.relNode); + } + else if (rnode.spcNode == DEFAULTTABLESPACE_OID) + { + /* The default tablespace is {datadir}/base */ + pathlen = strlen(DataDir) + 6 + OIDCHARS + 1 + OIDCHARS + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/base/%u/%u", + DataDir, rnode.dbNode, rnode.relNode); } else { - path = (char *) palloc(strlen(DataDir) + 6 + 2 * sizeof(NameData) + 3); - sprintf(path, "%s/base/%u/%u", DataDir, rnode.tblNode, rnode.relNode); + /* All other tablespaces are accessed via symlinks */ + pathlen = strlen(DataDir) + 16 + OIDCHARS + 1 + OIDCHARS + 1 + OIDCHARS + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/pg_tablespaces/%u/%u/%u", + DataDir, rnode.spcNode, rnode.dbNode, rnode.relNode); } return path; } @@ -52,23 +70,39 @@ relpath(RelFileNode rnode) * GetDatabasePath - construct path to a database dir * * Result is a palloc'd string. + * + * XXX this must agree with relpath()! */ - char * -GetDatabasePath(Oid tblNode) +GetDatabasePath(Oid dbNode, Oid spcNode) { + int pathlen; char *path; - if (tblNode == (Oid) 0) /* "global tablespace" */ + if (spcNode == GLOBALTABLESPACE_OID) { /* Shared system relations live in {datadir}/global */ - path = (char *) palloc(strlen(DataDir) + 8); - sprintf(path, "%s/global", DataDir); + Assert(dbNode == 0); + pathlen = strlen(DataDir) + 7 + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/global", + DataDir); + } + else if (spcNode == DEFAULTTABLESPACE_OID) + { + /* The default tablespace is {datadir}/base */ + pathlen = strlen(DataDir) + 6 + OIDCHARS + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/base/%u", + DataDir, dbNode); } else { - path = (char *) palloc(strlen(DataDir) + 6 + sizeof(NameData) + 1); - sprintf(path, "%s/base/%u", DataDir, tblNode); + /* All other tablespaces are accessed via symlinks */ + pathlen = strlen(DataDir) + 16 + OIDCHARS + 1 + OIDCHARS + 1; + path = (char *) palloc(pathlen); + snprintf(path, pathlen, "%s/pg_tablespaces/%u/%u", + DataDir, spcNode, dbNode); } return path; } diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index a4cc33d4c9d..68ea6f45f69 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.270 2004/06/10 17:55:53 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/heap.c,v 1.271 2004/06/18 06:13:19 tgl Exp $ * * * INTERFACE ROUTINES @@ -43,6 +43,7 @@ #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -203,15 +204,14 @@ SystemAttributeByName(const char *attname, bool relhasoids) Relation heap_create(const char *relname, Oid relnamespace, + Oid reltablespace, TupleDesc tupDesc, bool shared_relation, bool storage_create, bool allow_system_table_mods) { Oid relid; - Oid dbid = shared_relation ? InvalidOid : MyDatabaseId; bool nailme = false; - RelFileNode rnode; Relation rel; /* @@ -260,6 +260,8 @@ heap_create(const char *relname, relid = RelOid_pg_group; else if (strcmp(DatabaseRelationName, relname) == 0) relid = RelOid_pg_database; + else if (strcmp(TableSpaceRelationName, relname) == 0) + relid = RelOid_pg_tablespace; else relid = newoid(); } @@ -267,20 +269,14 @@ heap_create(const char *relname, relid = newoid(); /* - * For now, the physical identifier of the relation is the same as the - * logical identifier. - */ - rnode.tblNode = dbid; - rnode.relNode = relid; - - /* * build the relcache entry. */ rel = RelationBuildLocalRelation(relname, relnamespace, tupDesc, - relid, dbid, - rnode, + relid, + reltablespace, + shared_relation, nailme); /* @@ -296,6 +292,16 @@ heap_create(const char *relname, void heap_storage_create(Relation rel) { + /* + * We may be using the target table space for the first time in this + * database, so create a per-database subdirectory if needed. + * + * XXX it might be better to do this right in smgrcreate... + */ + TablespaceCreateDbspace(rel->rd_node.spcNode, rel->rd_node.dbNode); + /* + * Now we can make the file. + */ Assert(rel->rd_smgr == NULL); rel->rd_smgr = smgropen(rel->rd_node); smgrcreate(rel->rd_smgr, rel->rd_istemp, false); @@ -692,6 +698,7 @@ AddNewRelationType(const char *typeName, Oid heap_create_with_catalog(const char *relname, Oid relnamespace, + Oid reltablespace, TupleDesc tupdesc, char relkind, bool shared_relation, @@ -726,6 +733,7 @@ heap_create_with_catalog(const char *relname, */ new_rel_desc = heap_create(relname, relnamespace, + reltablespace, tupdesc, shared_relation, (relkind != RELKIND_VIEW && diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 8ada0915bd2..581799fc5f5 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/index.c,v 1.233 2004/05/31 19:24:05 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/index.c,v 1.234 2004/06/18 06:13:19 tgl Exp $ * * * INTERFACE ROUTINES @@ -467,6 +467,7 @@ index_create(Oid heapRelationId, const char *indexRelationName, IndexInfo *indexInfo, Oid accessMethodObjectId, + Oid tableSpaceId, Oid *classObjectId, bool primary, bool isconstraint, @@ -539,6 +540,7 @@ index_create(Oid heapRelationId, */ indexRelation = heap_create(indexRelationName, namespaceId, + tableSpaceId, indexTupDesc, shared_relation, true, diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index c7535508a73..b412023fe28 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -13,7 +13,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/namespace.c,v 1.66 2004/05/28 16:17:14 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/namespace.c,v 1.67 2004/06/18 06:13:19 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1668,7 +1668,7 @@ InitTempTableNamespace(void) * that access the temp namespace for my own backend skip * permissions checks on it. */ - namespaceId = NamespaceCreate(namespaceName, BOOTSTRAP_USESYSID); + namespaceId = NamespaceCreate(namespaceName, BOOTSTRAP_USESYSID, 0); /* Advance command counter to make namespace visible */ CommandCounterIncrement(); } diff --git a/src/backend/catalog/pg_namespace.c b/src/backend/catalog/pg_namespace.c index c3546f9068f..c600ac2a84d 100644 --- a/src/backend/catalog/pg_namespace.c +++ b/src/backend/catalog/pg_namespace.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/catalog/pg_namespace.c,v 1.8 2003/11/29 19:51:46 pgsql Exp $ + * $PostgreSQL: pgsql/src/backend/catalog/pg_namespace.c,v 1.9 2004/06/18 06:13:19 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -27,7 +27,7 @@ * --------------- */ Oid -NamespaceCreate(const char *nspName, int32 ownerSysId) +NamespaceCreate(const char *nspName, int32 ownerSysId, Oid nspTablespace) { Relation nspdesc; HeapTuple tup; @@ -59,6 +59,7 @@ NamespaceCreate(const char *nspName, int32 ownerSysId) namestrcpy(&nname, nspName); values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&nname); values[Anum_pg_namespace_nspowner - 1] = Int32GetDatum(ownerSysId); + values[Anum_pg_namespace_nsptablespace - 1] = Int32GetDatum(nspTablespace); nulls[Anum_pg_namespace_nspacl - 1] = 'n'; nspdesc = heap_openr(NamespaceRelationName, RowExclusiveLock); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index 0c450beac3c..644fd1d655d 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -4,7 +4,7 @@ # Makefile for backend/commands # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/commands/Makefile,v 1.33 2003/11/29 19:51:47 pgsql Exp $ +# $PostgreSQL: pgsql/src/backend/commands/Makefile,v 1.34 2004/06/18 06:13:22 tgl Exp $ # #------------------------------------------------------------------------- @@ -17,8 +17,8 @@ OBJS = aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \ dbcommands.o define.o explain.o functioncmds.o \ indexcmds.o lockcmds.o operatorcmds.o opclasscmds.o \ portalcmds.o prepare.o proclang.o \ - schemacmds.o sequence.o tablecmds.o trigger.o typecmds.o user.o \ - vacuum.o vacuumlazy.o variable.o view.o + schemacmds.o sequence.o tablecmds.o tablespace.o trigger.o \ + typecmds.o user.o vacuum.o vacuumlazy.o variable.o view.o all: SUBSYS.o diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 0483eaf2d3c..d81771c7a04 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.125 2004/05/31 19:24:05 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.126 2004/06/18 06:13:22 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -568,6 +568,7 @@ make_new_heap(Oid OIDOldHeap, const char *NewName) OIDNewHeap = heap_create_with_catalog(NewName, RelationGetNamespace(OldHeap), + OldHeap->rd_rel->reltablespace, tupdesc, OldHeap->rd_rel->relkind, OldHeap->rd_rel->relisshared, diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 337ec5395fb..8fbebecd874 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -9,13 +9,12 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.135 2004/06/10 22:26:18 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.136 2004/06/18 06:13:22 tgl Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" -#include <errno.h> #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> @@ -26,9 +25,12 @@ #include "catalog/catalog.h" #include "catalog/pg_database.h" #include "catalog/pg_shadow.h" +#include "catalog/pg_tablespace.h" #include "catalog/indexing.h" #include "commands/comment.h" #include "commands/dbcommands.h" +#include "commands/tablespace.h" +#include "mb/pg_wchar.h" #include "miscadmin.h" #include "storage/fd.h" #include "storage/freespace.h" @@ -41,32 +43,24 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" -#include "mb/pg_wchar.h" /* encoding check */ - /* non-export function prototypes */ static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, - char *dbpath); + Oid *dbTablespace); static bool have_createdb_privilege(void); -static char *resolve_alt_dbpath(const char *dbpath, Oid dboid); -static bool remove_dbdirs(const char *real_loc, const char *altloc); +static void remove_dbtablespaces(Oid db_id); + /* * CREATE DATABASE */ - void createdb(const CreatedbStmt *stmt) { - char *nominal_loc; - char *alt_loc; - char *target_dir; - char src_loc[MAXPGPATH]; -#ifndef WIN32 - char buf[2 * MAXPGPATH + 100]; -#endif + HeapScanDesc scan; + Relation rel; Oid src_dboid; AclId src_owner; int src_encoding; @@ -74,7 +68,8 @@ createdb(const CreatedbStmt *stmt) Oid src_lastsysoid; TransactionId src_vacuumxid; TransactionId src_frozenxid; - char src_dbpath[MAXPGPATH]; + Oid src_deftablespace; + Oid dst_deftablespace; Relation pg_database_rel; HeapTuple tuple; TupleDesc pg_database_dsc; @@ -83,36 +78,41 @@ createdb(const CreatedbStmt *stmt) Oid dboid; AclId datdba; ListCell *option; + DefElem *dtablespacename = NULL; DefElem *downer = NULL; - DefElem *dpath = NULL; DefElem *dtemplate = NULL; DefElem *dencoding = NULL; char *dbname = stmt->dbname; char *dbowner = NULL; - char *dbpath = NULL; char *dbtemplate = NULL; int encoding = -1; +#ifndef WIN32 + char buf[2 * MAXPGPATH + 100]; +#endif + + /* don't call this in a transaction block */ + PreventTransactionChain((void *) stmt, "CREATE DATABASE"); /* Extract options from the statement node tree */ foreach(option, stmt->options) { DefElem *defel = (DefElem *) lfirst(option); - if (strcmp(defel->defname, "owner") == 0) + if (strcmp(defel->defname, "tablespace") == 0) { - if (downer) + if (dtablespacename) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - downer = defel; + dtablespacename = defel; } - else if (strcmp(defel->defname, "location") == 0) + else if (strcmp(defel->defname, "owner") == 0) { - if (dpath) + if (downer) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - dpath = defel; + downer = defel; } else if (strcmp(defel->defname, "template") == 0) { @@ -130,6 +130,13 @@ createdb(const CreatedbStmt *stmt) errmsg("conflicting or redundant options"))); dencoding = defel; } + else if (strcmp(defel->defname, "location") == 0) + { + ereport(WARNING, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("LOCATION is not supported anymore"), + errhint("Consider using tablespaces instead."))); + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -137,8 +144,6 @@ createdb(const CreatedbStmt *stmt) if (downer && downer->arg) dbowner = strVal(downer->arg); - if (dpath && dpath->arg) - dbpath = strVal(dpath->arg); if (dtemplate && dtemplate->arg) dbtemplate = strVal(dtemplate->arg); if (dencoding && dencoding->arg) @@ -195,17 +200,6 @@ createdb(const CreatedbStmt *stmt) errmsg("must be superuser to create database for another user"))); } - /* don't call this in a transaction block */ - PreventTransactionChain((void *) stmt, "CREATE DATABASE"); - - /* alternate location requires symlinks */ -#ifndef HAVE_SYMLINK - if (dbpath != NULL) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot use an alternative location on this platform"))); -#endif - /* * Check for db name conflict. There is a race condition here, since * another backend could create the same DB name before we commit. @@ -227,8 +221,7 @@ createdb(const CreatedbStmt *stmt) if (!get_db_info(dbtemplate, &src_dboid, &src_owner, &src_encoding, &src_istemplate, &src_lastsysoid, - &src_vacuumxid, &src_frozenxid, - src_dbpath)) + &src_vacuumxid, &src_frozenxid, &src_deftablespace)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("template database \"%s\" does not exist", dbtemplate))); @@ -247,14 +240,6 @@ createdb(const CreatedbStmt *stmt) } /* - * Determine physical path of source database - */ - alt_loc = resolve_alt_dbpath(src_dbpath, src_dboid); - if (!alt_loc) - alt_loc = GetDatabasePath(src_dboid); - strcpy(src_loc, alt_loc); - - /* * The source DB can't have any active backends, except this one * (exception is to allow CREATE DB while connected to template1). * Otherwise we might copy inconsistent data. This check is not @@ -276,6 +261,33 @@ createdb(const CreatedbStmt *stmt) (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("invalid server encoding %d", encoding))); + /* Resolve default tablespace for new database */ + if (dtablespacename && dtablespacename->arg) + { + char *tablespacename; + AclResult aclresult; + + tablespacename = strVal(dtablespacename->arg); + dst_deftablespace = get_tablespace_oid(tablespacename); + if (!OidIsValid(dst_deftablespace)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + tablespacename))); + /* check permissions */ + aclresult = pg_tablespace_aclcheck(dst_deftablespace, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + tablespacename); + } + else + { + /* Use template database's default tablespace */ + dst_deftablespace = src_deftablespace; + /* Note there is no additional permission check in this path */ + } + /* * Preassign OID for pg_database tuple, so that we can compute db * path. @@ -283,39 +295,6 @@ createdb(const CreatedbStmt *stmt) dboid = newoid(); /* - * Compute nominal location (where we will try to access the - * database), and resolve alternate physical location if one is - * specified. - * - * If an alternate location is specified but is the same as the normal - * path, just drop the alternate-location spec (this seems friendlier - * than erroring out). We must test this case to avoid creating a - * circular symlink below. - */ - nominal_loc = GetDatabasePath(dboid); - alt_loc = resolve_alt_dbpath(dbpath, dboid); - - if (alt_loc && strcmp(alt_loc, nominal_loc) == 0) - { - alt_loc = NULL; - dbpath = NULL; - } - - if (strchr(nominal_loc, '\'')) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("database path may not contain single quotes"))); - if (alt_loc && strchr(alt_loc, '\'')) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("database path may not contain single quotes"))); - if (strchr(src_loc, '\'')) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("database path may not contain single quotes"))); - /* ... otherwise we'd be open to shell exploits below */ - - /* * Force dirty buffers out to disk, to ensure source database is * up-to-date for the copy. (We really only need to flush buffers for * the source database...) @@ -324,74 +303,89 @@ createdb(const CreatedbStmt *stmt) /* * Close virtual file descriptors so the kernel has more available for - * the mkdir() and system() calls below. + * the system() calls below. */ closeAllVfds(); /* - * Check we can create the target directory --- but then remove it - * because we rely on cp(1) to create it for real. + * Iterate through all tablespaces of the template database, and + * copy each one to the new database. + * + * If we are trying to change the default tablespace of the template, + * we require that the template not have any files in the new default + * tablespace. This avoids the need to merge two subdirectories. + * This could probably be improved later. */ - target_dir = alt_loc ? alt_loc : nominal_loc; + rel = heap_openr(TableSpaceRelationName, AccessShareLock); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Oid srctablespace = HeapTupleGetOid(tuple); + Oid dsttablespace; + char *srcpath; + char *dstpath; + struct stat st; - if (mkdir(target_dir, S_IRWXU) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create database directory \"%s\": %m", - target_dir))); - if (rmdir(target_dir) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not remove temporary directory \"%s\": %m", - target_dir))); + /* No need to copy global tablespace */ + if (srctablespace == GLOBALTABLESPACE_OID) + continue; - /* Make the symlink, if needed */ - if (alt_loc) - { -#ifdef HAVE_SYMLINK /* already throws error above */ - if (symlink(alt_loc, nominal_loc) != 0) -#endif + srcpath = GetDatabasePath(src_dboid, srctablespace); + + if (stat(srcpath, &st) < 0 || !S_ISDIR(st.st_mode)) + { + /* Assume we can ignore it */ + pfree(srcpath); + continue; + } + + if (srctablespace == src_deftablespace) + dsttablespace = dst_deftablespace; + else + dsttablespace = srctablespace; + + dstpath = GetDatabasePath(dboid, dsttablespace); + + if (stat(dstpath, &st) == 0 || errno != ENOENT) + { + remove_dbtablespaces(dboid); ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not link file \"%s\" to \"%s\": %m", - nominal_loc, alt_loc))); - } + (errmsg("could not initialize database directory"), + errdetail("Directory \"%s\" already exists.", dstpath))); + } - /* - * Copy the template database to the new location - * - * XXX use of cp really makes this code pretty grotty, particularly - * with respect to lack of ability to report errors well. Someday - * rewrite to do it for ourselves. - */ #ifndef WIN32 - /* We might need to use cp -R one day for portability */ - snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", src_loc, target_dir); - if (system(buf) != 0) - { - if (remove_dbdirs(nominal_loc, alt_loc)) + /* + * Copy this subdirectory to the new location + * + * XXX use of cp really makes this code pretty grotty, particularly + * with respect to lack of ability to report errors well. Someday + * rewrite to do it for ourselves. + */ + + /* We might need to use cp -R one day for portability */ + snprintf(buf, sizeof(buf), "cp -r '%s' '%s'", + srcpath, dstpath); + if (system(buf) != 0) + { + remove_dbtablespaces(dboid); ereport(ERROR, (errmsg("could not initialize database directory"), errdetail("Failing system command was: %s", buf), errhint("Look in the postmaster's stderr log for more information."))); - else - ereport(ERROR, - (errmsg("could not initialize database directory; delete failed as well"), - errdetail("Failing system command was: %s", buf), - errhint("Look in the postmaster's stderr log for more information."))); - } + } #else /* WIN32 */ - if (copydir(src_loc, target_dir) != 0) - { - /* copydir should already have given details of its troubles */ - if (remove_dbdirs(nominal_loc, alt_loc)) + if (copydir(srcpath, dstpath) != 0) + { + /* copydir should already have given details of its troubles */ + remove_dbtablespaces(dboid); ereport(ERROR, (errmsg("could not initialize database directory"))); - else - ereport(ERROR, - (errmsg("could not initialize database directory; delete failed as well"))); - } + } #endif /* WIN32 */ + } + heap_endscan(scan); + heap_close(rel, AccessShareLock); /* * Now OK to grab exclusive lock on pg_database. @@ -403,7 +397,7 @@ createdb(const CreatedbStmt *stmt) { /* Don't hold lock while doing recursive remove */ heap_close(pg_database_rel, AccessExclusiveLock); - remove_dbdirs(nominal_loc, alt_loc); + remove_dbtablespaces(dboid); ereport(ERROR, (errcode(ERRCODE_DUPLICATE_DATABASE), errmsg("database \"%s\" already exists", dbname))); @@ -427,9 +421,7 @@ createdb(const CreatedbStmt *stmt) new_record[Anum_pg_database_datlastsysoid - 1] = ObjectIdGetDatum(src_lastsysoid); new_record[Anum_pg_database_datvacuumxid - 1] = TransactionIdGetDatum(src_vacuumxid); new_record[Anum_pg_database_datfrozenxid - 1] = TransactionIdGetDatum(src_frozenxid); - /* do not set datpath to null, GetRawDatabaseInfo won't cope */ - new_record[Anum_pg_database_datpath - 1] = - DirectFunctionCall1(textin, CStringGetDatum(dbpath ? dbpath : "")); + new_record[Anum_pg_database_dattablespace - 1] = ObjectIdGetDatum(dst_deftablespace); /* * We deliberately set datconfig and datacl to defaults (NULL), rather @@ -471,14 +463,13 @@ dropdb(const char *dbname) int4 db_owner; bool db_istemplate; Oid db_id; - char *alt_loc; - char *nominal_loc; - char dbpath[MAXPGPATH]; Relation pgdbrel; SysScanDesc pgdbscan; ScanKeyData key; HeapTuple tup; + PreventTransactionChain((void *) dbname, "DROP DATABASE"); + AssertArg(dbname); if (strcmp(dbname, get_database_name(MyDatabaseId)) == 0) @@ -486,8 +477,6 @@ dropdb(const char *dbname) (errcode(ERRCODE_OBJECT_IN_USE), errmsg("cannot drop the currently open database"))); - PreventTransactionChain((void *) dbname, "DROP DATABASE"); - /* * Obtain exclusive lock on pg_database. We need this to ensure that * no new backend starts up in the target database while we are @@ -500,7 +489,7 @@ dropdb(const char *dbname) pgdbrel = heap_openr(DatabaseRelationName, AccessExclusiveLock); if (!get_db_info(dbname, &db_id, &db_owner, NULL, - &db_istemplate, NULL, NULL, NULL, dbpath)) + &db_istemplate, NULL, NULL, NULL, NULL)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_DATABASE), errmsg("database \"%s\" does not exist", dbname))); @@ -519,9 +508,6 @@ dropdb(const char *dbname) (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot drop a template database"))); - nominal_loc = GetDatabasePath(db_id); - alt_loc = resolve_alt_dbpath(dbpath, db_id); - /* * Check for active backends in the target database. */ @@ -585,9 +571,9 @@ dropdb(const char *dbname) FreeSpaceMapForgetDatabase(db_id); /* - * Remove the database's subdirectory and everything in it. + * Remove all tablespace subdirs belonging to the database. */ - remove_dbdirs(nominal_loc, alt_loc); + remove_dbtablespaces(db_id); /* * Force dirty buffers out to disk, so that newly-connecting backends @@ -831,7 +817,7 @@ static bool get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, int *encodingP, bool *dbIsTemplateP, Oid *dbLastSysOidP, TransactionId *dbVacuumXidP, TransactionId *dbFrozenXidP, - char *dbpath) + Oid *dbTablespace) { Relation relation; ScanKeyData scanKey; @@ -880,28 +866,9 @@ get_db_info(const char *name, Oid *dbIdP, int4 *ownerIdP, /* limit of frozen XIDs */ if (dbFrozenXidP) *dbFrozenXidP = dbform->datfrozenxid; - /* database path (as registered in pg_database) */ - if (dbpath) - { - Datum datum; - bool isnull; - - datum = heap_getattr(tuple, - Anum_pg_database_datpath, - RelationGetDescr(relation), - &isnull); - if (!isnull) - { - text *pathtext = DatumGetTextP(datum); - int pathlen = VARSIZE(pathtext) - VARHDRSZ; - - Assert(pathlen >= 0 && pathlen < MAXPGPATH); - strncpy(dbpath, VARDATA(pathtext), pathlen); - *(dbpath + pathlen) = '\0'; - } - else - strcpy(dbpath, ""); - } + /* default tablespace for this database */ + if (dbTablespace) + *dbTablespace = dbform->dattablespace; } systable_endscan(scan); @@ -930,105 +897,60 @@ have_createdb_privilege(void) return retval; } - -static char * -resolve_alt_dbpath(const char *dbpath, Oid dboid) +/* + * Remove tablespace directories + * + * We don't know what tablespaces db_id is using, so iterate through all + * tablespaces removing <tablespace>/db_id + */ +static void +remove_dbtablespaces(Oid db_id) { - const char *prefix; - char *ret; - size_t len; - - if (dbpath == NULL || dbpath[0] == '\0') - return NULL; - - if (first_dir_separator(dbpath)) - { - if (!is_absolute_path(dbpath)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("relative paths are not allowed as database locations"))); -#ifndef ALLOW_ABSOLUTE_DBPATHS - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("absolute paths are not allowed as database locations"))); -#endif - prefix = dbpath; - } - else + Relation rel; + HeapScanDesc scan; + HeapTuple tuple; + char buf[MAXPGPATH + 100]; + + rel = heap_openr(TableSpaceRelationName, AccessShareLock); + scan = heap_beginscan(rel, SnapshotNow, 0, NULL); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { - /* must be environment variable */ - char *var = getenv(dbpath); - - if (!var) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("postmaster environment variable \"%s\" not found", - dbpath))); - if (!is_absolute_path(var)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("postmaster environment variable \"%s\" must be absolute path", - dbpath))); - prefix = var; - } + Oid dsttablespace = HeapTupleGetOid(tuple); + char *dstpath; + struct stat st; - len = strlen(prefix) + 6 + sizeof(Oid) * 8 + 1; - if (len >= MAXPGPATH - 100) - ereport(ERROR, - (errcode(ERRCODE_INVALID_NAME), - errmsg("alternative path is too long"))); + /* Don't mess with the global tablespace */ + if (dsttablespace == GLOBALTABLESPACE_OID) + continue; - ret = palloc(len); - snprintf(ret, len, "%s/base/%u", prefix, dboid); + dstpath = GetDatabasePath(db_id, dsttablespace); - return ret; -} - - -static bool -remove_dbdirs(const char *nominal_loc, const char *alt_loc) -{ - const char *target_dir; - char buf[MAXPGPATH + 100]; - bool success = true; - - target_dir = alt_loc ? alt_loc : nominal_loc; - - /* - * Close virtual file descriptors so the kernel has more available for - * the system() call below. - */ - closeAllVfds(); - - if (alt_loc) - { - /* remove symlink */ - if (unlink(nominal_loc) != 0) + if (stat(dstpath, &st) < 0 || !S_ISDIR(st.st_mode)) { - ereport(WARNING, - (errcode_for_file_access(), - errmsg("could not remove file \"%s\": %m", nominal_loc))); - success = false; + /* Assume we can ignore it */ + pfree(dstpath); + continue; } - } #ifndef WIN32 - snprintf(buf, sizeof(buf), "rm -rf '%s'", target_dir); + snprintf(buf, sizeof(buf), "rm -rf '%s'", dstpath); #else - snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", target_dir); + snprintf(buf, sizeof(buf), "rmdir /s /q \"%s\"", dstpath); #endif - - if (system(buf) != 0) - { - ereport(WARNING, + if (system(buf) != 0) + { + ereport(WARNING, (errmsg("could not remove database directory \"%s\"", - target_dir), + dstpath), errdetail("Failing system command was: %s", buf), errhint("Look in the postmaster's stderr log for more information."))); - success = false; + } + + pfree(dstpath); } - return success; + heap_endscan(scan); + heap_close(rel, AccessShareLock); } @@ -1075,7 +997,7 @@ get_database_oid(const char *dbname) /* * get_database_name - given a database OID, look up the name * - * Returns InvalidOid if database name not found. + * Returns a palloc'd string, or NULL if no such database. * * This is not actually used in this file, but is exported for use elsewhere. */ diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index d141c83c1bf..43d9aabd046 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.121 2004/06/10 17:55:56 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.122 2004/06/18 06:13:23 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -27,6 +27,7 @@ #include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "executor/executor.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -64,6 +65,8 @@ static bool relationHasPrimaryKey(Relation rel); * 'indexRelationName': the name for the new index, or NULL to indicate * that a nonconflicting default name should be picked. * 'accessMethodName': name of the AM to use. + * 'tableSpaceName': name of the tablespace to create the index in. + * NULL specifies using the same tablespace as the parent relation. * 'attributeList': a list of IndexElem specifying columns and expressions * to index on. * 'predicate': the partial-index condition, or NULL if none. @@ -83,6 +86,7 @@ void DefineIndex(RangeVar *heapRelation, char *indexRelationName, char *accessMethodName, + char *tableSpaceName, List *attributeList, Expr *predicate, List *rangetable, @@ -98,6 +102,7 @@ DefineIndex(RangeVar *heapRelation, Oid accessMethodId; Oid relationId; Oid namespaceId; + Oid tablespaceId; Relation rel; HeapTuple tuple; Form_pg_am accessMethodForm; @@ -151,6 +156,29 @@ DefineIndex(RangeVar *heapRelation, get_namespace_name(namespaceId)); } + /* Determine tablespace to use */ + if (tableSpaceName) + { + AclResult aclresult; + + tablespaceId = get_tablespace_oid(tableSpaceName); + if (!OidIsValid(tablespaceId)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + tableSpaceName))); + /* check permissions */ + aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + tableSpaceName); + } else { + /* Use the parent rel's tablespace */ + tablespaceId = get_rel_tablespace(relationId); + /* Note there is no additional permission check in this path */ + } + /* * Select name for index if caller didn't specify */ @@ -335,7 +363,7 @@ DefineIndex(RangeVar *heapRelation, indexRelationName, RelationGetRelationName(rel)))); index_create(relationId, indexRelationName, - indexInfo, accessMethodId, classObjectId, + indexInfo, accessMethodId, tablespaceId, classObjectId, primary, isconstraint, allowSystemTableMods, skip_build); diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c index 18a212271ea..8366ea634a0 100644 --- a/src/backend/commands/schemacmds.c +++ b/src/backend/commands/schemacmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/schemacmds.c,v 1.18 2004/05/26 04:41:11 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/commands/schemacmds.c,v 1.19 2004/06/18 06:13:23 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,6 +23,7 @@ #include "catalog/pg_namespace.h" #include "commands/dbcommands.h" #include "commands/schemacmds.h" +#include "commands/tablespace.h" #include "miscadmin.h" #include "parser/analyze.h" #include "tcop/utility.h" @@ -41,6 +42,7 @@ CreateSchemaCommand(CreateSchemaStmt *stmt) const char *schemaName = stmt->schemaname; const char *authId = stmt->authid; Oid namespaceId; + Oid tablespaceId; List *parsetree_list; ListCell *parsetree_item; const char *owner_name; @@ -100,8 +102,33 @@ CreateSchemaCommand(CreateSchemaStmt *stmt) errmsg("unacceptable schema name \"%s\"", schemaName), errdetail("The prefix \"pg_\" is reserved for system schemas."))); + /* + * Select default tablespace for schema. If not given, use zero + * which implies the database's default tablespace. + */ + if (stmt->tablespacename) + { + AclResult aclresult; + + tablespaceId = get_tablespace_oid(stmt->tablespacename); + if (!OidIsValid(tablespaceId)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + stmt->tablespacename))); + /* check permissions */ + aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + stmt->tablespacename); + } else { + tablespaceId = InvalidOid; + /* note there is no permission check in this path */ + } + /* Create the schema's namespace */ - namespaceId = NamespaceCreate(schemaName, owner_userid); + namespaceId = NamespaceCreate(schemaName, owner_userid, tablespaceId); /* Advance cmd counter to make the namespace visible */ CommandCounterIncrement(); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 351813b5dec..1b6538b539c 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.111 2004/05/26 04:41:11 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.112 2004/06/18 06:13:23 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -180,6 +180,7 @@ DefineSequence(CreateSeqStmt *seq) stmt->constraints = NIL; stmt->hasoids = MUST_NOT_HAVE_OIDS; stmt->oncommit = ONCOMMIT_NOOP; + stmt->tablespacename = seq->tablespacename; seqoid = DefineRelation(stmt, RELKIND_SEQUENCE); @@ -1071,8 +1072,8 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record) buffer = XLogReadBuffer(true, reln, 0); if (!BufferIsValid(buffer)) - elog(PANIC, "seq_redo: can't read block of %u/%u", - xlrec->node.tblNode, xlrec->node.relNode); + elog(PANIC, "seq_redo: can't read block 0 of rel %u/%u/%u", + xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode); page = (Page) BufferGetPage(buffer); @@ -1114,6 +1115,6 @@ seq_desc(char *buf, uint8 xl_info, char *rec) return; } - sprintf(buf + strlen(buf), "node %u/%u", - xlrec->node.tblNode, xlrec->node.relNode); + sprintf(buf + strlen(buf), "rel %u/%u/%u", + xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 3979335313b..8fd07e396ae 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.115 2004/06/10 18:34:45 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.116 2004/06/18 06:13:23 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -33,6 +33,7 @@ #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "executor/executor.h" #include "lib/stringinfo.h" @@ -258,6 +259,7 @@ DefineRelation(CreateStmt *stmt, char relkind) Oid namespaceId; List *schema = stmt->tableElts; Oid relationId; + Oid tablespaceId; Relation rel; TupleDesc descriptor; List *inheritOids; @@ -302,6 +304,31 @@ DefineRelation(CreateStmt *stmt, char relkind) } /* + * Select tablespace to use. If not specified, use containing schema's + * default tablespace (which may in turn default to database's default). + */ + if (stmt->tablespacename) + { + AclResult aclresult; + + tablespaceId = get_tablespace_oid(stmt->tablespacename); + if (!OidIsValid(tablespaceId)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + stmt->tablespacename))); + /* check permissions */ + aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(), + ACL_CREATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_TABLESPACE, + stmt->tablespacename); + } else { + tablespaceId = get_namespace_tablespace(namespaceId); + /* note no permission check on tablespace in this case */ + } + + /* * Look up inheritance ancestors and generate relation schema, * including inherited attributes. */ @@ -379,6 +406,7 @@ DefineRelation(CreateStmt *stmt, char relkind) relationId = heap_create_with_catalog(relname, namespaceId, + tablespaceId, descriptor, relkind, false, @@ -1770,7 +1798,7 @@ ATController(Relation rel, List *cmds, bool recurse) /* * ATPrepCmd * - * Traffic cop for ALTER TABLE Phase 1 operations, including simple + * Traffic cop for ALTER TABLE Phase 1 operations, including simple * recursion and permission checks. * * Caller must have acquired AccessExclusiveLock on relation already. @@ -2679,7 +2707,7 @@ find_composite_type_dependencies(Oid typeOid, const char *origTblName) } -/* +/* * ALTER TABLE ADD COLUMN * * Adds an additional attribute to a relation making the assumption that @@ -3521,6 +3549,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, DefineIndex(stmt->relation, /* relation */ stmt->idxname, /* index name */ stmt->accessMethod, /* am name */ + stmt->tableSpace, stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->rangetable, @@ -3566,7 +3595,7 @@ ATExecAddConstraint(AlteredTableInfo *tab, Relation rel, Node *newConstraint) list_make1(constr)); /* Add each constraint to Phase 3's queue */ foreach(lcon, newcons) - { + { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); NewConstraint *newcon; @@ -3643,7 +3672,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, int16 fkattnum[INDEX_MAX_KEYS]; Oid pktypoid[INDEX_MAX_KEYS]; Oid fktypoid[INDEX_MAX_KEYS]; - Oid opclasses[INDEX_MAX_KEYS]; + Oid opclasses[INDEX_MAX_KEYS]; int i; int numfks, numpks; @@ -3791,7 +3820,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, * will be incurred to check FK validity. */ if (!op_in_opclass(oprid(o), opclasses[i])) - ereport(WARNING, + ereport(WARNING, (errmsg("foreign key constraint \"%s\" " "will require costly sequential scans", fkconstraint->constr_name), @@ -4565,7 +4594,7 @@ ATPrepAlterColumnType(List **wqueue, /* * Add a work queue item to make ATRewriteTable update the column * contents. - */ + */ newval = (NewColumnValue *) palloc0(sizeof(NewColumnValue)); newval->attnum = attnum; newval->expr = (Expr *) transform; @@ -5272,6 +5301,7 @@ AlterTableCreateToastTable(Oid relOid, bool silent) */ toast_relid = heap_create_with_catalog(toast_relname, PG_TOAST_NAMESPACE, + rel->rd_rel->reltablespace, tupdesc, RELKIND_TOASTVALUE, shared_relation, @@ -5309,7 +5339,9 @@ AlterTableCreateToastTable(Oid relOid, bool silent) classObjectId[1] = INT4_BTREE_OPS_OID; toast_idxid = index_create(toast_relid, toast_idxname, indexInfo, - BTREE_AM_OID, classObjectId, + BTREE_AM_OID, + rel->rd_rel->reltablespace, + classObjectId, true, false, true, false); /* diff --git a/src/backend/commands/tablespace.c b/src/backend/commands/tablespace.c new file mode 100644 index 00000000000..d412389bb86 --- /dev/null +++ b/src/backend/commands/tablespace.c @@ -0,0 +1,660 @@ +/*------------------------------------------------------------------------- + * + * tablespace.c + * Commands to manipulate table spaces + * + * + * Tablespaces in PostgreSQL are designed to allow users to determine + * where the data file(s) for a given database object reside on the file + * system. + * + * A tablespace represents a directory on the file system. At tablespace + * creation time, the directory must be empty. To simplify things and + * remove the possibility of having file name conflicts, we isolate + * files within a tablespace into database-specific subdirectories. + * + * To support file access via the information given in RelFileNode, we + * maintain a symbolic-link map in $PGDATA/pg_tablespaces. The symlinks are + * named by tablespace OIDs and point to the actual tablespace directories. + * Thus the full path to an arbitrary file is + * $PGDATA/pg_tablespaces/spcoid/dboid/relfilenode + * + * There are two tablespaces created at initdb time: global (for shared + * tables) and default (for everything else). For backwards compatibility + * and to remain functional on platforms without symlinks, these tablespaces + * are accessed specially: they are respectively + * $PGDATA/global/relfilenode + * $PGDATA/base/dboid/relfilenode + * + * The implementation is designed to be backwards compatible. For this reason + * (and also as a feature unto itself) when a user creates an object without + * specifying a tablespace, we look at the object's parent and place + * the object in the parent's tablespace. The hierarchy is as follows: + * database > schema > table > index + * + * To allow CREATE DATABASE to give a new database a default tablespace + * that's different from the template database's default, we make the + * provision that a zero in pg_class.reltablespace means the database's + * default tablespace. Without this, CREATE DATABASE would have to go in + * and munge the system catalogs of the new database. This special meaning + * of zero also applies in pg_namespace.nsptablespace. + * + * + * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.1 2004/06/18 06:13:23 tgl Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> +#include <dirent.h> +#include <sys/types.h> +#include <sys/stat.h> + +#include "access/heapam.h" +#include "catalog/catalog.h" +#include "catalog/catname.h" +#include "catalog/indexing.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_tablespace.h" +#include "commands/tablespace.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/smgr.h" +#include "utils/acl.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" + + +static void set_short_version(const char *path); +static bool directory_is_empty(const char *path); + + +/* + * Each database using a table space is isolated into its own name space + * by a subdirectory named for the database OID. On first creation of an + * object in the tablespace, create the subdirectory. If the subdirectory + * already exists, just fall through quietly. + * + * If tablespaces are not supported, this is just a no-op; CREATE DATABASE + * is expected to create the default subdirectory for the database. + */ +void +TablespaceCreateDbspace(Oid spcNode, Oid dbNode) +{ +#ifdef HAVE_SYMLINK + struct stat st; + char *dir; + + /* + * The global tablespace doesn't have per-database subdirectories, + * so nothing to do for it. + */ + if (spcNode == GLOBALTABLESPACE_OID) + return; + + Assert(OidIsValid(spcNode)); + Assert(OidIsValid(dbNode)); + + dir = GetDatabasePath(dbNode, spcNode); + + if (stat(dir, &st) < 0) + { + if (errno == ENOENT) + { + /* + * Acquire ExclusiveLock on pg_tablespace to ensure that no + * DROP TABLESPACE or TablespaceCreateDbspace is running + * concurrently. Simple reads from pg_tablespace are OK. + */ + Relation rel; + + rel = heap_openr(TableSpaceRelationName, ExclusiveLock); + + /* + * Recheck to see if someone created the directory while + * we were waiting for lock. + */ + if (stat(dir, &st) == 0 && S_ISDIR(st.st_mode)) + { + /* need not do anything */ + } + else + { + /* OK, go for it */ + if (mkdir(dir, S_IRWXU) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + dir))); + } + + /* OK to drop the exclusive lock */ + heap_close(rel, ExclusiveLock); + } + else + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat directory \"%s\": %m", dir))); + } + } + else + { + /* be paranoid */ + if (!S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" exists but is not a directory", + dir))); + } + + pfree(dir); +#endif /* HAVE_SYMLINK */ +} + +/* + * Create a table space + * + * Only superusers can create a tablespace. This seems a reasonable restriction + * since we're determining the system layout and, anyway, we probably have + * root if we're doing this kind of activity + */ +void +CreateTableSpace(CreateTableSpaceStmt *stmt) +{ +#ifdef HAVE_SYMLINK + Relation rel; + Datum values[Natts_pg_tablespace]; + char nulls[Natts_pg_tablespace]; + HeapTuple tuple; + Oid tablespaceoid; + char *location; + char *linkloc; + AclId ownerid; + + /* validate */ + + /* don't call this in a transaction block */ + PreventTransactionChain((void *) stmt, "CREATE TABLESPACE"); + + /* Must be super user */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create tablespace \"%s\"", + stmt->tablespacename), + errhint("Must be superuser to create a tablespace."))); + + /* However, the eventual owner of the tablespace need not be */ + if (stmt->owner) + { + /* No need to check result, get_usesysid() does that */ + ownerid = get_usesysid(stmt->owner); + } + else + ownerid = GetUserId(); + + /* Unix-ify the offered path, and strip any trailing slashes */ + location = pstrdup(stmt->location); + canonicalize_path(location); + + /* disallow quotes, else CREATE DATABASE would be at risk */ + if (strchr(location, '\'')) + ereport(ERROR, + (errcode(ERRCODE_INVALID_NAME), + errmsg("tablespace location may not contain single quotes"))); + + /* + * Allowing relative paths seems risky + * + * this also helps us ensure that location is not empty or whitespace + */ + if (!is_absolute_path(location)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("tablespace location must be an absolute path"))); + + /* + * Check that location isn't too long. Remember that we're going to append + * '/<dboid>/<relid>.<nnn>' (XXX but do we ever form the whole path + * explicitly? This may be overly conservative.) + */ + if (strlen(location) >= (MAXPGPATH - 1 - 10 - 1 - 10 - 1 - 10)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("tablespace location \"%s\" is too long", + location))); + + /* + * Check that there is no other tablespace by this name. (The + * unique index would catch this anyway, but might as well give + * a friendlier message.) + */ + if (OidIsValid(get_tablespace_oid(stmt->tablespacename))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("tablespace \"%s\" already exists", + stmt->tablespacename))); + + /* + * Insert tuple into pg_tablespace. The purpose of doing this first + * is to lock the proposed tablename against other would-be creators. + * The insertion will roll back if we find problems below. + */ + rel = heap_openr(TableSpaceRelationName, RowExclusiveLock); + + MemSet(nulls, ' ', Natts_pg_tablespace); + + values[Anum_pg_tablespace_spcname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(stmt->tablespacename)); + values[Anum_pg_tablespace_spcowner - 1] = + Int32GetDatum(ownerid); + values[Anum_pg_tablespace_spclocation - 1] = + DirectFunctionCall1(textin, CStringGetDatum(location)); + nulls[Anum_pg_tablespace_spcacl - 1] = 'n'; + + tuple = heap_formtuple(rel->rd_att, values, nulls); + + tablespaceoid = newoid(); + + HeapTupleSetOid(tuple, tablespaceoid); + + simple_heap_insert(rel, tuple); + + CatalogUpdateIndexes(rel, tuple); + + heap_freetuple(tuple); + + /* + * Attempt to coerce target directory to safe permissions. If this + * fails, it doesn't exist or has the wrong owner. + */ + if (chmod(location, 0700) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not set permissions on directory \"%s\": %m", + location))); + + /* + * Check the target directory is empty. + */ + if (!directory_is_empty(location)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("directory \"%s\" is not empty", + location))); + + /* + * Create the PG_VERSION file in the target directory. This has several + * purposes: to make sure we can write in the directory, to prevent + * someone from creating another tablespace pointing at the same + * directory (the emptiness check above will fail), and to label + * tablespace directories by PG version. + */ + set_short_version(location); + + /* + * All seems well, create the symlink + */ + linkloc = (char *) palloc(strlen(DataDir) + 16 + 10 + 1); + sprintf(linkloc, "%s/pg_tablespaces/%u", DataDir, tablespaceoid); + + if (symlink(location, linkloc) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create symbolic link \"%s\": %m", + linkloc))); + + pfree(linkloc); + pfree(location); + + heap_close(rel, RowExclusiveLock); + +#else /* !HAVE_SYMLINK */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("tablespaces are not supported on this platform"))); +#endif /* HAVE_SYMLINK */ +} + +/* + * Drop a table space + * + * Be careful to check that the tablespace is empty. + */ +void +DropTableSpace(DropTableSpaceStmt *stmt) +{ +#ifdef HAVE_SYMLINK + char *tablespacename = stmt->tablespacename; + HeapScanDesc scandesc; + Relation rel; + HeapTuple tuple; + ScanKeyData entry[1]; + char *location; + Oid tablespaceoid; + DIR *dirdesc; + struct dirent *de; + char *subfile; + + /* don't call this in a transaction block */ + PreventTransactionChain((void *) stmt, "DROP TABLESPACE"); + + /* + * Acquire ExclusiveLock on pg_tablespace to ensure that no one else + * is trying to do DROP TABLESPACE or TablespaceCreateDbspace concurrently. + */ + rel = heap_openr(TableSpaceRelationName, ExclusiveLock); + + /* + * Find the target tuple + */ + ScanKeyInit(&entry[0], + Anum_pg_tablespace_spcname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(tablespacename)); + scandesc = heap_beginscan(rel, SnapshotNow, 1, entry); + tuple = heap_getnext(scandesc, ForwardScanDirection); + + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tablespace \"%s\" does not exist", + tablespacename))); + + tablespaceoid = HeapTupleGetOid(tuple); + + /* Must be superuser or owner */ + if (GetUserId() != ((Form_pg_tablespace) GETSTRUCT(tuple))->spcowner && + !superuser()) + aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TABLESPACE, + tablespacename); + + /* Disallow drop of the standard tablespaces, even by superuser */ + if (tablespaceoid == GLOBALTABLESPACE_OID || + tablespaceoid == DEFAULTTABLESPACE_OID) + aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_TABLESPACE, + tablespacename); + + location = (char *) palloc(strlen(DataDir) + 16 + 10 + 1); + sprintf(location, "%s/pg_tablespaces/%u", DataDir, tablespaceoid); + + /* + * Check if the tablespace still contains any files. We try to rmdir + * each per-database directory we find in it. rmdir failure implies + * there are still files in that subdirectory, so give up. (We do not + * have to worry about undoing any already completed rmdirs, since + * the next attempt to use the tablespace from that database will simply + * recreate the subdirectory via TablespaceCreateDbspace.) + * + * Since we hold exclusive lock, no one else should be creating any + * fresh subdirectories in parallel. It is possible that new files + * are being created within subdirectories, though, so the rmdir + * call could fail. Worst consequence is a less friendly error message. + */ + dirdesc = AllocateDir(location); + if (dirdesc == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", + location))); + + errno = 0; + while ((de = readdir(dirdesc)) != NULL) + { + /* Note we ignore PG_VERSION for the nonce */ + if (strcmp(de->d_name, ".") == 0 || + strcmp(de->d_name, "..") == 0 || + strcmp(de->d_name, "PG_VERSION") == 0) + { + errno = 0; + continue; + } + + subfile = palloc(strlen(location) + 1 + strlen(de->d_name) + 1); + sprintf(subfile, "%s/%s", location, de->d_name); + + /* This check is just to deliver a friendlier error message */ + if (!directory_is_empty(subfile)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("tablespace \"%s\" is not empty", + tablespacename))); + + /* Do the real deed */ + if (rmdir(subfile) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not delete directory \"%s\": %m", + subfile))); + + pfree(subfile); + } +#ifdef WIN32 + /* This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + not in released version */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; +#endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", + location))); + FreeDir(dirdesc); + + /* + * Okay, try to unlink PG_VERSION and then remove the symlink. + */ + subfile = palloc(strlen(location) + 11 + 1); + sprintf(subfile, "%s/PG_VERSION", location); + + if (unlink(subfile) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not unlink file \"%s\": %m", + subfile))); + + if (unlink(location) < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not unlink symbolic link \"%s\": %m", + location))); + + pfree(subfile); + pfree(location); + + /* + * We have successfully destroyed the infrastructure ... there is + * now no way to roll back the DROP ... so proceed to remove the + * pg_tablespace tuple. + */ + simple_heap_delete(rel, &tuple->t_self); + + heap_endscan(scandesc); + + heap_close(rel, ExclusiveLock); + +#else /* !HAVE_SYMLINK */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("tablespaces are not supported on this platform"))); +#endif /* HAVE_SYMLINK */ +} + + +/* + * write out the PG_VERSION file in the specified directory + */ +static void +set_short_version(const char *path) +{ + char *short_version; + bool gotdot = false; + int end; + char *fullname; + FILE *version_file; + + /* Construct short version string (should match initdb.c) */ + short_version = pstrdup(PG_VERSION); + + for (end = 0; short_version[end] != '\0'; end++) + { + if (short_version[end] == '.') + { + Assert(end != 0); + if (gotdot) + break; + else + gotdot = true; + } + else if (short_version[end] < '0' || short_version[end] > '9') + { + /* gone past digits and dots */ + break; + } + } + Assert(end > 0 && short_version[end - 1] != '.' && gotdot); + short_version[end] = '\0'; + + /* Now write the file */ + fullname = palloc(strlen(path) + 11 + 1); + sprintf(fullname, "%s/PG_VERSION", path); + version_file = AllocateFile(fullname, PG_BINARY_W); + if (version_file == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + fullname))); + fprintf(version_file, "%s\n", short_version); + if (FreeFile(version_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write to file \"%s\": %m", + fullname))); + + pfree(fullname); + pfree(short_version); +} + +/* + * Check if a directory is empty. + */ +static bool +directory_is_empty(const char *path) +{ + DIR *dirdesc; + struct dirent *de; + + dirdesc = AllocateDir(path); + if (dirdesc == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", + path))); + + errno = 0; + while ((de = readdir(dirdesc)) != NULL) + { + if (strcmp(de->d_name, ".") == 0 || + strcmp(de->d_name, "..") == 0) + { + errno = 0; + continue; + } + FreeDir(dirdesc); + return false; + } +#ifdef WIN32 + /* This fix is in mingw cvs (runtime/mingwex/dirent.c rev 1.4), but + not in released version */ + if (GetLastError() == ERROR_NO_MORE_FILES) + errno = 0; +#endif + if (errno) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read directory \"%s\": %m", + path))); + FreeDir(dirdesc); + return true; +} + +/* + * get_tablespace_oid - given a tablespace name, look up the OID + * + * Returns InvalidOid if tablespace name not found. + */ +Oid +get_tablespace_oid(const char *tablespacename) +{ + Oid result; + Relation rel; + HeapScanDesc scandesc; + HeapTuple tuple; + ScanKeyData entry[1]; + + /* Search pg_tablespace */ + rel = heap_openr(TableSpaceRelationName, AccessShareLock); + + ScanKeyInit(&entry[0], + Anum_pg_tablespace_spcname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(tablespacename)); + scandesc = heap_beginscan(rel, SnapshotNow, 1, entry); + tuple = heap_getnext(scandesc, ForwardScanDirection); + + if (HeapTupleIsValid(tuple)) + result = HeapTupleGetOid(tuple); + else + result = InvalidOid; + + heap_endscan(scandesc); + heap_close(rel, AccessShareLock); + + return result; +} + +/* + * get_tablespace_name - given a tablespace OID, look up the name + * + * Returns a palloc'd string, or NULL if no such tablespace. + */ +char * +get_tablespace_name(Oid spc_oid) +{ + char *result; + Relation rel; + HeapScanDesc scandesc; + HeapTuple tuple; + ScanKeyData entry[1]; + + /* Search pg_tablespace */ + rel = heap_openr(TableSpaceRelationName, AccessShareLock); + + ScanKeyInit(&entry[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(spc_oid)); + scandesc = heap_beginscan(rel, SnapshotNow, 1, entry); + tuple = heap_getnext(scandesc, ForwardScanDirection); + + /* We assume that there can be at most one matching tuple */ + if (HeapTupleIsValid(tuple)) + result = pstrdup(NameStr(((Form_pg_tablespace) GETSTRUCT(tuple))->spcname)); + else + result = NULL; + + heap_endscan(scandesc); + heap_close(rel, AccessShareLock); + + return result; +} diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c index a64617d08d4..d087ad8895c 100644 --- a/src/backend/commands/typecmds.c +++ b/src/backend/commands/typecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/typecmds.c,v 1.59 2004/06/10 17:55:56 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/commands/typecmds.c,v 1.60 2004/06/18 06:13:23 tgl Exp $ * * DESCRIPTION * The "DefineFoo" routines take the parse tree and pick out the @@ -1110,8 +1110,8 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) errmsg("composite type must have at least one attribute"))); /* - * now create the parameters for keys/inheritance etc. All of them are - * nil... + * now set the parameters for keys/inheritance etc. All of these + * are uninteresting for composite types... */ createStmt->relation = (RangeVar *) typevar; createStmt->tableElts = coldeflist; @@ -1119,6 +1119,7 @@ DefineCompositeType(const RangeVar *typevar, List *coldeflist) createStmt->constraints = NIL; createStmt->hasoids = MUST_NOT_HAVE_OIDS; createStmt->oncommit = ONCOMMIT_NOOP; + createStmt->tablespacename = NULL; /* * finally create the relation... diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c index 4c27bd67199..67971219699 100644 --- a/src/backend/commands/view.c +++ b/src/backend/commands/view.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/view.c,v 1.82 2004/05/26 04:41:13 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/commands/view.c,v 1.83 2004/06/18 06:13:23 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -134,8 +134,8 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace) else { /* - * now create the parameters for keys/inheritance etc. All of them - * are nil... + * now set the parameters for keys/inheritance etc. All of these + * are uninteresting for views... */ createStmt->relation = (RangeVar *) relation; createStmt->tableElts = attrList; @@ -143,6 +143,7 @@ DefineVirtualRelation(const RangeVar *relation, List *tlist, bool replace) createStmt->constraints = NIL; createStmt->hasoids = MUST_NOT_HAVE_OIDS; createStmt->oncommit = ONCOMMIT_NOOP; + createStmt->tablespacename = NULL; /* * finally create the relation (this will error out if there's an diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4099ef0a791..6f0f22a3221 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -26,7 +26,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.233 2004/05/30 23:40:26 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.234 2004/06/18 06:13:26 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -779,6 +779,7 @@ InitPlan(QueryDesc *queryDesc, bool explainOnly) intoRelationId = heap_create_with_catalog(intoName, namespaceId, + InvalidOid, tupdesc, RELKIND_RELATION, false, diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index bfcb82447d8..90983e6db0b 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -15,7 +15,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.285 2004/06/09 19:08:15 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.286 2004/06/18 06:13:28 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -1753,6 +1753,7 @@ _copyCreateStmt(CreateStmt *from) COPY_NODE_FIELD(constraints); COPY_SCALAR_FIELD(hasoids); COPY_SCALAR_FIELD(oncommit); + COPY_STRING_FIELD(tablespacename); return newnode; } @@ -1836,6 +1837,7 @@ _copyIndexStmt(IndexStmt *from) COPY_STRING_FIELD(idxname); COPY_NODE_FIELD(relation); COPY_STRING_FIELD(accessMethod); + COPY_STRING_FIELD(tableSpace); COPY_NODE_FIELD(indexParams); COPY_NODE_FIELD(whereClause); COPY_NODE_FIELD(rangetable); @@ -2146,6 +2148,7 @@ _copyCreateSeqStmt(CreateSeqStmt *from) COPY_NODE_FIELD(sequence); COPY_NODE_FIELD(options); + COPY_STRING_FIELD(tablespacename); return newnode; } @@ -2193,6 +2196,28 @@ _copyVariableResetStmt(VariableResetStmt *from) return newnode; } +static CreateTableSpaceStmt * +_copyCreateTableSpaceStmt(CreateTableSpaceStmt *from) +{ + CreateTableSpaceStmt *newnode = makeNode(CreateTableSpaceStmt); + + COPY_STRING_FIELD(tablespacename); + COPY_STRING_FIELD(owner); + COPY_STRING_FIELD(location); + + return newnode; +} + +static DropTableSpaceStmt * +_copyDropTableSpaceStmt(DropTableSpaceStmt *from) +{ + DropTableSpaceStmt *newnode = makeNode(DropTableSpaceStmt); + + COPY_STRING_FIELD(tablespacename); + + return newnode; +} + static CreateTrigStmt * _copyCreateTrigStmt(CreateTrigStmt *from) { @@ -2371,6 +2396,7 @@ _copyCreateSchemaStmt(CreateSchemaStmt *from) COPY_STRING_FIELD(schemaname); COPY_STRING_FIELD(authid); + COPY_STRING_FIELD(tablespacename); COPY_NODE_FIELD(schemaElts); return newnode; @@ -2914,6 +2940,12 @@ copyObject(void *from) case T_VariableResetStmt: retval = _copyVariableResetStmt(from); break; + case T_CreateTableSpaceStmt: + retval = _copyCreateTableSpaceStmt(from); + break; + case T_DropTableSpaceStmt: + retval = _copyDropTableSpaceStmt(from); + break; case T_CreateTrigStmt: retval = _copyCreateTrigStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index fec550836b3..47ec3157727 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -18,7 +18,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.224 2004/06/09 19:08:15 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.225 2004/06/18 06:13:28 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -835,6 +835,7 @@ _equalCreateStmt(CreateStmt *a, CreateStmt *b) COMPARE_NODE_FIELD(constraints); COMPARE_SCALAR_FIELD(hasoids); COMPARE_SCALAR_FIELD(oncommit); + COMPARE_STRING_FIELD(tablespacename); return true; } @@ -904,6 +905,7 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b) COMPARE_STRING_FIELD(idxname); COMPARE_NODE_FIELD(relation); COMPARE_STRING_FIELD(accessMethod); + COMPARE_STRING_FIELD(tableSpace); COMPARE_NODE_FIELD(indexParams); COMPARE_NODE_FIELD(whereClause); COMPARE_NODE_FIELD(rangetable); @@ -1164,6 +1166,7 @@ _equalCreateSeqStmt(CreateSeqStmt *a, CreateSeqStmt *b) { COMPARE_NODE_FIELD(sequence); COMPARE_NODE_FIELD(options); + COMPARE_STRING_FIELD(tablespacename); return true; } @@ -1204,6 +1207,24 @@ _equalVariableResetStmt(VariableResetStmt *a, VariableResetStmt *b) } static bool +_equalCreateTableSpaceStmt(CreateTableSpaceStmt *a, CreateTableSpaceStmt *b) +{ + COMPARE_STRING_FIELD(tablespacename); + COMPARE_STRING_FIELD(owner); + COMPARE_STRING_FIELD(location); + + return true; +} + +static bool +_equalDropTableSpaceStmt(DropTableSpaceStmt *a, DropTableSpaceStmt *b) +{ + COMPARE_STRING_FIELD(tablespacename); + + return true; +} + +static bool _equalCreateTrigStmt(CreateTrigStmt *a, CreateTrigStmt *b) { COMPARE_STRING_FIELD(trigname); @@ -1352,6 +1373,7 @@ _equalCreateSchemaStmt(CreateSchemaStmt *a, CreateSchemaStmt *b) { COMPARE_STRING_FIELD(schemaname); COMPARE_STRING_FIELD(authid); + COMPARE_STRING_FIELD(tablespacename); COMPARE_NODE_FIELD(schemaElts); return true; @@ -2052,6 +2074,12 @@ equal(void *a, void *b) case T_VariableResetStmt: retval = _equalVariableResetStmt(a, b); break; + case T_CreateTableSpaceStmt: + retval = _equalCreateTableSpaceStmt(a, b); + break; + case T_DropTableSpaceStmt: + retval = _equalDropTableSpaceStmt(a, b); + break; case T_CreateTrigStmt: retval = _equalCreateTrigStmt(a, b); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 1984ced756f..087ebb39a15 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.239 2004/06/09 19:08:15 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/nodes/outfuncs.c,v 1.240 2004/06/18 06:13:28 tgl Exp $ * * NOTES * Every node type that can appear in stored rules' parsetrees *must* @@ -1128,7 +1128,7 @@ _outInClauseInfo(StringInfo str, InClauseInfo *node) static void _outCreateStmt(StringInfo str, CreateStmt *node) { - WRITE_NODE_TYPE("CREATE"); + WRITE_NODE_TYPE("CREATESTMT"); WRITE_NODE_FIELD(relation); WRITE_NODE_FIELD(tableElts); @@ -1136,16 +1136,18 @@ _outCreateStmt(StringInfo str, CreateStmt *node) WRITE_NODE_FIELD(constraints); WRITE_ENUM_FIELD(hasoids, ContainsOids); WRITE_ENUM_FIELD(oncommit, OnCommitAction); + WRITE_STRING_FIELD(tablespacename); } static void _outIndexStmt(StringInfo str, IndexStmt *node) { - WRITE_NODE_TYPE("INDEX"); + WRITE_NODE_TYPE("INDEXSTMT"); WRITE_STRING_FIELD(idxname); WRITE_NODE_FIELD(relation); WRITE_STRING_FIELD(accessMethod); + WRITE_STRING_FIELD(tableSpace); WRITE_NODE_FIELD(indexParams); WRITE_NODE_FIELD(whereClause); WRITE_NODE_FIELD(rangetable); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index e5537e26ae9..54501031562 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.305 2004/06/10 17:55:58 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.306 2004/06/18 06:13:31 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -874,6 +874,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, seqstmt = makeNode(CreateSeqStmt); seqstmt->sequence = makeRangeVar(snamespace, sname); seqstmt->options = NIL; + seqstmt->tablespacename = NULL; cxt->blist = lappend(cxt->blist, seqstmt); @@ -1199,6 +1200,7 @@ transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt) index->relation = cxt->relation; index->accessMethod = DEFAULT_INDEX_TYPE; + index->tableSpace = NULL; index->indexParams = NIL; index->whereClause = NULL; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 07a882f3db5..9dc53604c61 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.461 2004/06/09 19:08:17 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.462 2004/06/18 06:13:31 tgl Exp $ * * HISTORY * AUTHOR DATE MAJOR EVENT @@ -136,12 +136,12 @@ static void doNegateFloat(Value *v); AnalyzeStmt ClosePortalStmt ClusterStmt CommentStmt ConstraintsSetStmt CopyStmt CreateAsStmt CreateCastStmt CreateDomainStmt CreateGroupStmt CreateOpClassStmt CreatePLangStmt - CreateSchemaStmt CreateSeqStmt CreateStmt + CreateSchemaStmt CreateSeqStmt CreateStmt CreateTableSpaceStmt CreateAssertStmt CreateTrigStmt CreateUserStmt CreatedbStmt DeclareCursorStmt DefineStmt DeleteStmt DropGroupStmt DropOpClassStmt DropPLangStmt DropStmt DropAssertStmt DropTrigStmt DropRuleStmt DropCastStmt - DropUserStmt DropdbStmt ExplainStmt FetchStmt + DropUserStmt DropdbStmt DropTableSpaceStmt ExplainStmt FetchStmt GrantStmt IndexStmt InsertStmt ListenStmt LoadStmt LockStmt NotifyStmt ExplainableStmt PreparableStmt CreateFunctionStmt ReindexStmt RemoveAggrStmt @@ -324,6 +324,7 @@ static void doNegateFloat(Value *v); %type <list> constraints_set_list %type <boolean> constraints_set_mode +%type <str> OptTableSpace OptTableSpaceOwner /* @@ -384,7 +385,7 @@ static void doNegateFloat(Value *v); ORDER OUT_P OUTER_P OVERLAPS OVERLAY OWNER PARTIAL PASSWORD PATH_P PENDANT PLACING POSITION - PRECISION PRESERVE PREPARE PRIMARY + PRECISION PRESERVE PREPARE PRIMARY PRIOR PRIVILEGES PROCEDURAL PROCEDURE QUOTE @@ -398,7 +399,7 @@ static void doNegateFloat(Value *v); SHOW SIMILAR SIMPLE SMALLINT SOME STABLE START STATEMENT STATISTICS STDIN STDOUT STORAGE STRICT_P SUBSTRING SYSID - TABLE TEMP TEMPLATE TEMPORARY THEN TIME TIMESTAMP + TABLE TABLESPACE TEMP TEMPLATE TEMPORARY THEN TIME TIMESTAMP TO TOAST TRAILING TRANSACTION TREAT TRIGGER TRIM TRUE_P TRUNCATE TRUSTED TYPE_P @@ -513,6 +514,7 @@ stmt : | CreateSchemaStmt | CreateSeqStmt | CreateStmt + | CreateTableSpaceStmt | CreateTrigStmt | CreateUserStmt | CreatedbStmt @@ -527,6 +529,7 @@ stmt : | DropPLangStmt | DropRuleStmt | DropStmt + | DropTableSpaceStmt | DropTrigStmt | DropUserStmt | DropdbStmt @@ -781,7 +784,7 @@ DropGroupStmt: *****************************************************************************/ CreateSchemaStmt: - CREATE SCHEMA OptSchemaName AUTHORIZATION UserId OptSchemaEltList + CREATE SCHEMA OptSchemaName AUTHORIZATION UserId OptTableSpace OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* One can omit the schema name or the authorization id. */ @@ -790,16 +793,18 @@ CreateSchemaStmt: else n->schemaname = $5; n->authid = $5; - n->schemaElts = $6; + n->tablespacename = $6; + n->schemaElts = $7; $$ = (Node *)n; } - | CREATE SCHEMA ColId OptSchemaEltList + | CREATE SCHEMA ColId OptTableSpace OptSchemaEltList { CreateSchemaStmt *n = makeNode(CreateSchemaStmt); /* ...but not both */ n->schemaname = $3; n->authid = NULL; - n->schemaElts = $4; + n->tablespacename = $4; + n->schemaElts = $5; $$ = (Node *)n; } ; @@ -1277,7 +1282,7 @@ alter_table_cmd: n->name = $3; $$ = (Node *)n; } - /* ALTER TABLE <name> SET WITHOUT CLUSTER */ + /* ALTER TABLE <name> SET WITHOUT CLUSTER */ | SET WITHOUT CLUSTER { AlterTableCmd *n = makeNode(AlterTableCmd); @@ -1464,7 +1469,7 @@ opt_using: *****************************************************************************/ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' - OptInherit OptWithOids OnCommitOption + OptInherit OptWithOids OnCommitOption OptTableSpace { CreateStmt *n = makeNode(CreateStmt); $4->istemp = $2; @@ -1474,10 +1479,11 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' n->constraints = NIL; n->hasoids = $9; n->oncommit = $10; + n->tablespacename = $11; $$ = (Node *)n; } | CREATE OptTemp TABLE qualified_name OF qualified_name - '(' OptTableElementList ')' OptWithOids OnCommitOption + '(' OptTableElementList ')' OptWithOids OnCommitOption OptTableSpace { /* SQL99 CREATE TABLE OF <UDT> (cols) seems to be satisfied * by our inheritance capabilities. Let's try it... @@ -1490,6 +1496,7 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' n->constraints = NIL; n->hasoids = $10; n->oncommit = $11; + n->tablespacename = $12; $$ = (Node *)n; } ; @@ -1901,6 +1908,10 @@ OnCommitOption: ON COMMIT DROP { $$ = ONCOMMIT_DROP; } | /*EMPTY*/ { $$ = ONCOMMIT_NOOP; } ; +OptTableSpace: TABLESPACE name { $$ = $2; } + | /*EMPTY*/ { $$ = NULL; } + ; + /* * Note: CREATE TABLE ... AS SELECT ... is just another spelling for @@ -1979,12 +1990,13 @@ CreateAsElement: *****************************************************************************/ CreateSeqStmt: - CREATE OptTemp SEQUENCE qualified_name OptSeqList + CREATE OptTemp SEQUENCE qualified_name OptSeqList OptTableSpace { CreateSeqStmt *n = makeNode(CreateSeqStmt); $4->istemp = $2; n->sequence = $4; n->options = $5; + n->tablespacename = $6; $$ = (Node *)n; } ; @@ -2136,6 +2148,45 @@ opt_procedural: /***************************************************************************** * + * QUERY: + * CREATE TABLESPACE tablespace LOCATION '/path/to/tablespace/' + * + *****************************************************************************/ + +CreateTableSpaceStmt: CREATE TABLESPACE name OptTableSpaceOwner LOCATION Sconst + { + CreateTableSpaceStmt *n = makeNode(CreateTableSpaceStmt); + n->tablespacename = $3; + n->owner = $4; + n->location = $6; + $$ = (Node *) n; + } + ; + +OptTableSpaceOwner: OWNER name { $$ = $2; } + | /*EMPTY */ { $$ = NULL; } + ; + +/***************************************************************************** + * + * QUERY : + * DROP TABLESPACE <tablespace> + * + * No need for drop behaviour as we cannot implement dependencies for + * objects in other databases; we can only support RESTRICT. + * + ****************************************************************************/ + +DropTableSpaceStmt: DROP TABLESPACE name + { + DropTableSpaceStmt *n = makeNode(DropTableSpaceStmt); + n->tablespacename = $3; + $$ = (Node *) n; + } + ; + +/***************************************************************************** + * * QUERIES : * CREATE TRIGGER ... * DROP TRIGGER ... @@ -2735,7 +2786,7 @@ CommentStmt: n->objargs = NIL; n->comment = $7; $$ = (Node *) n; - } + } ; comment_type: @@ -3026,6 +3077,13 @@ privilege_target: n->objs = $2; $$ = n; } + | TABLESPACE name_list + { + PrivTarget *n = makeNode(PrivTarget); + n->objtype = ACL_OBJECT_TABLESPACE; + n->objs = $2; + $$ = n; + } ; @@ -3092,12 +3150,14 @@ function_with_argtypes: * QUERY: * create index <indexname> on <relname> * [ using <access> ] "(" ( <col> [ using <opclass> ] )+ ")" - * [ where <predicate> ] + * [ tablespace <tablespacename> ] [ where <predicate> ] * + * Note: we cannot put TABLESPACE clause after WHERE clause unless we are + * willing to make TABLESPACE a fully reserved word. *****************************************************************************/ IndexStmt: CREATE index_opt_unique INDEX index_name ON qualified_name - access_method_clause '(' index_params ')' where_clause + access_method_clause '(' index_params ')' OptTableSpace where_clause { IndexStmt *n = makeNode(IndexStmt); n->unique = $2; @@ -3105,7 +3165,8 @@ IndexStmt: CREATE index_opt_unique INDEX index_name ON qualified_name n->relation = $6; n->accessMethod = $7; n->indexParams = $9; - n->whereClause = $11; + n->tableSpace = $11; + n->whereClause = $12; $$ = (Node *)n; } ; @@ -3896,7 +3957,15 @@ createdb_opt_list: ; createdb_opt_item: - LOCATION opt_equal Sconst + TABLESPACE opt_equal name + { + $$ = makeDefElem("tablespace", (Node *)makeString($3)); + } + | TABLESPACE opt_equal DEFAULT + { + $$ = makeDefElem("tablespace", NULL); + } + | LOCATION opt_equal Sconst { $$ = makeDefElem("location", (Node *)makeString($3)); } @@ -6801,7 +6870,7 @@ subquery_Op: { $$ = list_make1(makeString("!~~*")); } /* cannot put SIMILAR TO here, because SIMILAR TO is a hack. * the regular expression is preprocessed by a function (similar_escape), - * and the ~ operator for posix regular expressions is used. + * and the ~ operator for posix regular expressions is used. * x SIMILAR TO y -> x ~ similar_escape(y) * this transformation is made on the fly by the parser upwards. * however the SubLink structure which handles any/some/all stuff @@ -6978,7 +7047,7 @@ in_expr: select_with_parens * COALESCE(a,b,...) * same as CASE WHEN a IS NOT NULL THEN a WHEN b IS NOT NULL THEN b ... END * - thomas 1998-11-09 - * + * * NULLIF and COALESCE have become first class nodes to * prevent double evaluation of arguments. * - Kris Jurka 2003-02-11 @@ -7565,6 +7634,7 @@ unreserved_keyword: | STORAGE | SYSID | STRICT_P + | TABLESPACE | TEMP | TEMPLATE | TEMPORARY diff --git a/src/backend/parser/keywords.c b/src/backend/parser/keywords.c index a3c765a1952..a89f8da7d4e 100644 --- a/src/backend/parser/keywords.c +++ b/src/backend/parser/keywords.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/parser/keywords.c,v 1.149 2004/04/21 00:34:18 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/parser/keywords.c,v 1.150 2004/06/18 06:13:31 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -297,6 +297,7 @@ static const ScanKeyword ScanKeywords[] = { {"substring", SUBSTRING}, {"sysid", SYSID}, {"table", TABLE}, + {"tablespace", TABLESPACE}, {"temp", TEMP}, {"template", TEMPLATE}, {"temporary", TEMPORARY}, diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index da1998a0d50..725b79cad38 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.170 2004/06/11 16:43:23 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.171 2004/06/18 06:13:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -654,7 +654,7 @@ BufferSync(int percent, int maxpages) */ dirty_buffers = (BufferDesc **) palloc(NBuffers * sizeof(BufferDesc *)); buftags = (BufferTag *) palloc(NBuffers * sizeof(BufferTag)); - + LWLockAcquire(BufMgrLock, LW_EXCLUSIVE); num_buffer_dirty = StrategyDirtyBufferList(dirty_buffers, buftags, NBuffers); @@ -832,9 +832,10 @@ AtEOXact_Buffers(bool isCommit) if (isCommit) elog(WARNING, "buffer refcount leak: [%03d] " - "(rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)", + "(rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)", i, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); @@ -1137,9 +1138,10 @@ recheck: { /* the sole pin should be ours */ if (bufHdr->refcount != 1 || PrivateRefCount[i - 1] == 0) - elog(FATAL, "block %u of %u/%u is still referenced (private %d, global %u)", + elog(FATAL, "block %u of %u/%u/%u is still referenced (private %d, global %u)", bufHdr->tag.blockNum, - bufHdr->tag.rnode.tblNode, + bufHdr->tag.rnode.spcNode, + bufHdr->tag.rnode.dbNode, bufHdr->tag.rnode.relNode, PrivateRefCount[i - 1], bufHdr->refcount); /* Make sure it will be released */ @@ -1180,13 +1182,7 @@ DropBuffers(Oid dbid) { bufHdr = &BufferDescriptors[i - 1]; recheck: - - /* - * We know that currently database OID is tblNode but this - * probably will be changed in future and this func will be used - * to drop tablespace buffers. - */ - if (bufHdr->tag.rnode.tblNode == dbid) + if (bufHdr->tag.rnode.dbNode == dbid) { /* * If there is I/O in progress, better wait till it's done; @@ -1243,10 +1239,11 @@ PrintBufferDescs(void) for (i = 0; i < NBuffers; ++i, ++buf) { elog(LOG, - "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, " + "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, " "blockNum=%u, flags=0x%x, refcount=%u %d)", i, buf->freeNext, buf->freePrev, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } @@ -1257,9 +1254,9 @@ PrintBufferDescs(void) /* interactive backend */ for (i = 0; i < NBuffers; ++i, ++buf) { - printf("[%-2d] (%u/%u, %u) flags=0x%x, refcount=%u %d)\n", - i, buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, + printf("[%-2d] (%u/%u/%u, %u) flags=0x%x, refcount=%u %d)\n", + i, buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } } @@ -1278,10 +1275,11 @@ PrintPinnedBufs(void) { if (PrivateRefCount[i] > 0) elog(WARNING, - "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u, " + "[%02d] (freeNext=%d, freePrev=%d, rel=%u/%u/%u, " "blockNum=%u, flags=0x%x, refcount=%u %d)", i, buf->freeNext, buf->freePrev, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, PrivateRefCount[i]); } @@ -1464,11 +1462,11 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer) BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, - "PIN(Incr) %d rel = %u/%u, blockNum = %u, " + "PIN(Incr) %d rel = %u/%u/%u, blockNum = %u, " "refcount = %d, file: %s, line: %d\n", buffer, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } @@ -1484,11 +1482,11 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer) BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, - "UNPIN(Rel) %d rel = %u/%u, blockNum = %u, " + "UNPIN(Rel) %d rel = %u/%u/%u, blockNum = %u, " "refcount = %d, file: %s, line: %d\n", buffer, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } } @@ -1513,11 +1511,11 @@ ReleaseAndReadBuffer_Debug(char *file, BufferDesc *buf = &BufferDescriptors[buffer - 1]; fprintf(stderr, - "UNPIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, " + "UNPIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, " "refcount = %d, file: %s, line: %d\n", buffer, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[buffer - 1], file, line); } if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer)) @@ -1525,11 +1523,11 @@ ReleaseAndReadBuffer_Debug(char *file, BufferDesc *buf = &BufferDescriptors[b - 1]; fprintf(stderr, - "PIN(Rel&Rd) %d rel = %u/%u, blockNum = %u, " + "PIN(Rel&Rd) %d rel = %u/%u/%u, blockNum = %u, " "refcount = %d, file: %s, line: %d\n", b, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, PrivateRefCount[b - 1], file, line); } return b; @@ -1890,9 +1888,10 @@ AbortBufferIO(void) { ereport(WARNING, (errcode(ERRCODE_IO_ERROR), - errmsg("could not write block %u of %u/%u", + errmsg("could not write block %u of %u/%u/%u", buf->tag.blockNum, - buf->tag.rnode.tblNode, + buf->tag.rnode.spcNode, + buf->tag.rnode.dbNode, buf->tag.rnode.relNode), errdetail("Multiple failures --- write error may be permanent."))); } @@ -1912,7 +1911,9 @@ buffer_write_error_callback(void *arg) BufferDesc *bufHdr = (BufferDesc *) arg; if (bufHdr != NULL) - errcontext("writing block %u of relation %u/%u", + errcontext("writing block %u of relation %u/%u/%u", bufHdr->tag.blockNum, - bufHdr->tag.rnode.tblNode, bufHdr->tag.rnode.relNode); + bufHdr->tag.rnode.spcNode, + bufHdr->tag.rnode.dbNode, + bufHdr->tag.rnode.relNode); } diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 95e86e955d1..f4d1163f16a 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -9,7 +9,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.55 2004/05/31 20:31:33 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/buffer/localbuf.c,v 1.56 2004/06/18 06:13:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -236,10 +236,10 @@ AtEOXact_LocalBuffers(bool isCommit) if (isCommit) elog(WARNING, - "local buffer leak: [%03d] (rel=%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)", + "local buffer leak: [%03d] (rel=%u/%u/%u, blockNum=%u, flags=0x%x, refcount=%u %d)", i, - buf->tag.rnode.tblNode, buf->tag.rnode.relNode, - buf->tag.blockNum, buf->flags, + buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, + buf->tag.rnode.relNode, buf->tag.blockNum, buf->flags, buf->refcount, LocalRefCount[i]); LocalRefCount[i] = 0; diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c index 527ad8496ab..28815e6d77f 100644 --- a/src/backend/storage/freespace/freespace.c +++ b/src/backend/storage/freespace/freespace.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/freespace/freespace.c,v 1.31 2004/06/05 19:48:08 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/freespace/freespace.c,v 1.32 2004/06/18 06:13:34 tgl Exp $ * * * NOTES: @@ -658,9 +658,6 @@ FreeSpaceMapForgetRel(RelFileNode *rel) * * This is called during DROP DATABASE. As above, might as well reclaim * map space sooner instead of later. - * - * XXX when we implement tablespaces, target Oid will need to be tablespace - * ID not database ID. */ void FreeSpaceMapForgetDatabase(Oid dbid) @@ -672,7 +669,7 @@ FreeSpaceMapForgetDatabase(Oid dbid) for (fsmrel = FreeSpaceMap->usageList; fsmrel; fsmrel = nextrel) { nextrel = fsmrel->nextUsage; /* in case we delete it */ - if (fsmrel->key.tblNode == dbid) + if (fsmrel->key.dbNode == dbid) delete_fsm_rel(fsmrel); } LWLockRelease(FreeSpaceLock); @@ -1847,8 +1844,9 @@ DumpFreeSpace(void) for (fsmrel = FreeSpaceMap->usageList; fsmrel; fsmrel = fsmrel->nextUsage) { relNum++; - fprintf(stderr, "Map %d: rel %u/%u isIndex %d avgRequest %u lastPageCount %d nextPage %d\nMap= ", - relNum, fsmrel->key.tblNode, fsmrel->key.relNode, + fprintf(stderr, "Map %d: rel %u/%u/%u isIndex %d avgRequest %u lastPageCount %d nextPage %d\nMap= ", + relNum, + fsmrel->key.spcNode, fsmrel->key.dbNode, fsmrel->key.relNode, (int) fsmrel->isIndex, fsmrel->avgRequest, fsmrel->lastPageCount, fsmrel->nextPage); if (fsmrel->isIndex) diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 4f0d241215d..b0d667b8369 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.107 2004/06/02 17:28:18 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/smgr/md.c,v 1.108 2004/06/18 06:13:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -363,8 +363,9 @@ mdopen(SMgrRelation reln, bool allowNotFound) return NULL; ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not open relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); } } @@ -765,9 +766,10 @@ mdsync(void) { ereport(LOG, (errcode_for_file_access(), - errmsg("could not fsync segment %u of relation %u/%u: %m", + errmsg("could not fsync segment %u of relation %u/%u/%u: %m", entry->segno, - entry->rnode.tblNode, + entry->rnode.spcNode, + entry->rnode.dbNode, entry->rnode.relNode))); return false; } @@ -945,9 +947,10 @@ _mdfd_getseg(SMgrRelation reln, BlockNumber blkno, bool allowNotFound) return NULL; ereport(ERROR, (errcode_for_file_access(), - errmsg("could not open segment %u of relation %u/%u (target block %u): %m", + errmsg("could not open segment %u of relation %u/%u/%u (target block %u): %m", nextsegno, - reln->smgr_rnode.tblNode, + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode, blkno))); } diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 8977f026e4f..c7783d878f2 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.73 2004/06/02 17:28:18 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/smgr/smgr.c,v 1.74 2004/06/18 06:13:37 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -227,8 +227,9 @@ smgrclose(SMgrRelation reln) if (! (*(smgrsw[reln->smgr_which].smgr_close)) (reln)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not close relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not close relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); if (hash_search(SMgrRelationHash, @@ -308,8 +309,9 @@ smgrcreate(SMgrRelation reln, bool isTemp, bool isRedo) if (! (*(smgrsw[reln->smgr_which].smgr_create)) (reln, isRedo)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not create relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not create relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); if (isRedo) @@ -427,8 +429,9 @@ smgr_internal_unlink(RelFileNode rnode, int which, bool isTemp, bool isRedo) if (! (*(smgrsw[which].smgr_unlink)) (rnode, isRedo)) ereport(WARNING, (errcode_for_file_access(), - errmsg("could not unlink relation %u/%u: %m", - rnode.tblNode, + errmsg("could not unlink relation %u/%u/%u: %m", + rnode.spcNode, + rnode.dbNode, rnode.relNode))); } @@ -447,8 +450,9 @@ smgrextend(SMgrRelation reln, BlockNumber blocknum, char *buffer, bool isTemp) isTemp)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not extend relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not extend relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode), errhint("Check free disk space."))); } @@ -467,9 +471,10 @@ smgrread(SMgrRelation reln, BlockNumber blocknum, char *buffer) if (! (*(smgrsw[reln->smgr_which].smgr_read)) (reln, blocknum, buffer)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read block %u of relation %u/%u: %m", + errmsg("could not read block %u of relation %u/%u/%u: %m", blocknum, - reln->smgr_rnode.tblNode, + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); } @@ -491,9 +496,10 @@ smgrwrite(SMgrRelation reln, BlockNumber blocknum, char *buffer, bool isTemp) isTemp)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not write block %u of relation %u/%u: %m", + errmsg("could not write block %u of relation %u/%u/%u: %m", blocknum, - reln->smgr_rnode.tblNode, + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); } @@ -520,8 +526,9 @@ smgrnblocks(SMgrRelation reln) if (nblocks == InvalidBlockNumber) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not count blocks of relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not count blocks of relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); return nblocks; @@ -552,8 +559,9 @@ smgrtruncate(SMgrRelation reln, BlockNumber nblocks, bool isTemp) if (newblks == InvalidBlockNumber) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not truncate relation %u/%u to %u blocks: %m", - reln->smgr_rnode.tblNode, + errmsg("could not truncate relation %u/%u/%u to %u blocks: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode, nblocks))); @@ -607,8 +615,9 @@ smgrimmedsync(SMgrRelation reln) if (! (*(smgrsw[reln->smgr_which].smgr_immedsync)) (reln)) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not sync relation %u/%u: %m", - reln->smgr_rnode.tblNode, + errmsg("could not sync relation %u/%u/%u: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode))); } @@ -775,8 +784,9 @@ smgr_redo(XLogRecPtr lsn, XLogRecord *record) if (newblks == InvalidBlockNumber) ereport(WARNING, (errcode_for_file_access(), - errmsg("could not truncate relation %u/%u to %u blocks: %m", - reln->smgr_rnode.tblNode, + errmsg("could not truncate relation %u/%u/%u to %u blocks: %m", + reln->smgr_rnode.spcNode, + reln->smgr_rnode.dbNode, reln->smgr_rnode.relNode, xlrec->blkno))); } @@ -800,16 +810,17 @@ smgr_desc(char *buf, uint8 xl_info, char *rec) { xl_smgr_create *xlrec = (xl_smgr_create *) rec; - sprintf(buf + strlen(buf), "file create: %u/%u", - xlrec->rnode.tblNode, xlrec->rnode.relNode); + sprintf(buf + strlen(buf), "file create: %u/%u/%u", + xlrec->rnode.spcNode, xlrec->rnode.dbNode, + xlrec->rnode.relNode); } else if (info == XLOG_SMGR_TRUNCATE) { xl_smgr_truncate *xlrec = (xl_smgr_truncate *) rec; - sprintf(buf + strlen(buf), "file truncate: %u/%u to %u blocks", - xlrec->rnode.tblNode, xlrec->rnode.relNode, - xlrec->blkno); + sprintf(buf + strlen(buf), "file truncate: %u/%u/%u to %u blocks", + xlrec->rnode.spcNode, xlrec->rnode.dbNode, + xlrec->rnode.relNode, xlrec->blkno); } else strcat(buf, "UNKNOWN"); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 4d6c246cea1..d12cf0d750f 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.218 2004/05/29 22:48:20 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/tcop/utility.c,v 1.219 2004/06/18 06:13:38 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -36,6 +36,7 @@ #include "commands/schemacmds.h" #include "commands/sequence.h" #include "commands/tablecmds.h" +#include "commands/tablespace.h" #include "commands/trigger.h" #include "commands/typecmds.h" #include "commands/user.h" @@ -258,6 +259,7 @@ check_xact_readonly(Node *parsetree) case T_CreateSchemaStmt: case T_CreateSeqStmt: case T_CreateStmt: + case T_CreateTableSpaceStmt: case T_CreateTrigStmt: case T_CompositeTypeStmt: case T_CreateUserStmt: @@ -266,6 +268,7 @@ check_xact_readonly(Node *parsetree) case T_DropCastStmt: case T_DropStmt: case T_DropdbStmt: + case T_DropTableSpaceStmt: case T_RemoveFuncStmt: case T_DropGroupStmt: case T_DropPLangStmt: @@ -404,6 +407,14 @@ ProcessUtility(Node *parsetree, } break; + case T_CreateTableSpaceStmt: + CreateTableSpace((CreateTableSpaceStmt *) parsetree); + break; + + case T_DropTableSpaceStmt: + DropTableSpace((DropTableSpaceStmt *) parsetree); + break; + case T_DropStmt: { DropStmt *stmt = (DropStmt *) parsetree; @@ -636,6 +647,7 @@ ProcessUtility(Node *parsetree, DefineIndex(stmt->relation, /* relation */ stmt->idxname, /* index name */ stmt->accessMethod, /* am name */ + stmt->tableSpace, stmt->indexParams, /* parameters */ (Expr *) stmt->whereClause, stmt->rangetable, @@ -1153,6 +1165,14 @@ CreateCommandTag(Node *parsetree) tag = "CREATE TABLE"; break; + case T_CreateTableSpaceStmt: + tag = "CREATE TABLESPACE"; + break; + + case T_DropTableSpaceStmt: + tag = "DROP TABLESPACE"; + break; + case T_DropStmt: switch (((DropStmt *) parsetree)->removeType) { @@ -1224,6 +1244,9 @@ CreateCommandTag(Node *parsetree) case OBJECT_SCHEMA: tag = "ALTER SCHEMA"; break; + case OBJECT_TABLESPACE: + tag = "ALTER TABLESPACE"; + break; case OBJECT_TRIGGER: tag = "ALTER TRIGGER"; break; diff --git a/src/backend/utils/adt/acl.c b/src/backend/utils/adt/acl.c index d02683245ac..4b13e318be6 100644 --- a/src/backend/utils/adt/acl.c +++ b/src/backend/utils/adt/acl.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.105 2004/06/01 21:49:22 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/acl.c,v 1.106 2004/06/18 06:13:49 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -539,6 +539,10 @@ acldefault(GrantObjectType objtype, AclId ownerid) world_default = ACL_NO_RIGHTS; owner_default = ACL_ALL_RIGHTS_NAMESPACE; break; + case ACL_OBJECT_TABLESPACE: + world_default = ACL_NO_RIGHTS; + owner_default = ACL_ALL_RIGHTS_TABLESPACE; + break; default: elog(ERROR, "unrecognized objtype: %d", (int) objtype); world_default = ACL_NO_RIGHTS; /* keep compiler quiet */ diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 4caaabd62cd..1a2fe54d7ac 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -3,7 +3,7 @@ * back to source text * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.172 2004/06/16 01:26:47 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/adt/ruleutils.c,v 1.173 2004/06/18 06:13:49 tgl Exp $ * * This software is copyrighted by Jan Wieck - Hamburg. * @@ -53,6 +53,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_shadow.h" #include "catalog/pg_trigger.h" +#include "commands/tablespace.h" #include "executor/spi.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -768,6 +769,23 @@ pg_get_indexdef_worker(Oid indexrelid, int colno, int prettyFlags) appendStringInfoChar(&buf, ')'); /* + * If the index is in a different tablespace from its parent, + * tell about that + */ + if (OidIsValid(idxrelrec->reltablespace) && + idxrelrec->reltablespace != get_rel_tablespace(indrelid)) + { + char *spcname = get_tablespace_name(idxrelrec->reltablespace); + + if (spcname) /* just paranoia... */ + { + appendStringInfo(&buf, " TABLESPACE %s", + quote_identifier(spcname)); + pfree(spcname); + } + } + + /* * If it's a partial index, decompile and append the predicate */ if (!heap_attisnull(ht_idx, Anum_pg_index_indpred)) diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 7b00f0531e2..ea958a27b46 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -74,7 +74,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.61 2004/05/06 16:10:57 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.62 2004/06/18 06:13:52 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -308,7 +308,7 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr, /* We assume dbId need not be checked because it will never change */ /* relfilenode fields must be checked to support reassignment */ ProcessMessageList(hdr->rclist, - if (msg->rc.relId == relId && + if (msg->rc.relId == relId && RelFileNodeEquals(msg->rc.physId, physId)) return); /* OK, add the item */ @@ -555,14 +555,18 @@ PrepareForTupleInvalidation(Relation relation, HeapTuple tuple, databaseId = InvalidOid; else databaseId = MyDatabaseId; - rnode.tblNode = databaseId; /* XXX change for tablespaces */ + if (classtup->reltablespace) + rnode.spcNode = classtup->reltablespace; + else + rnode.spcNode = MyDatabaseTableSpace; + rnode.dbNode = databaseId; rnode.relNode = classtup->relfilenode; /* * Note: during a pg_class row update that assigns a new relfilenode - * value, we will be called on both the old and new tuples, and thus - * will broadcast invalidation messages showing both the old and new - * relfilenode values. This ensures that other backends will close - * smgr references to the old relfilenode file. + * or reltablespace value, we will be called on both the old and new + * tuples, and thus will broadcast invalidation messages showing both + * the old and new RelFileNode values. This ensures that other + * backends will close smgr references to the old file. */ } else if (tupleRelId == RelOid_pg_attribute) @@ -580,7 +584,8 @@ PrepareForTupleInvalidation(Relation relation, HeapTuple tuple, */ databaseId = MyDatabaseId; /* We assume no smgr cache flush is needed, either */ - rnode.tblNode = InvalidOid; + rnode.spcNode = InvalidOid; + rnode.dbNode = InvalidOid; rnode.relNode = InvalidOid; } else @@ -760,7 +765,11 @@ CacheInvalidateRelcacheByTuple(HeapTuple classTuple) databaseId = InvalidOid; else databaseId = MyDatabaseId; - rnode.tblNode = databaseId; /* XXX change for tablespaces */ + if (classtup->reltablespace) + rnode.spcNode = classtup->reltablespace; + else + rnode.spcNode = MyDatabaseTableSpace; + rnode.dbNode = databaseId; rnode.relNode = classtup->relfilenode; RegisterRelcacheInvalidation(databaseId, relationId, rnode); diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index d51d1c18925..1621982502a 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/cache/lsyscache.c,v 1.113 2004/06/06 00:41:27 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/cache/lsyscache.c,v 1.114 2004/06/18 06:13:52 tgl Exp $ * * NOTES * Eventually, the index information should go through here, too. @@ -986,6 +986,34 @@ get_rel_namespace(Oid relid) } /* + * get_rel_tablespace + * Returns the pg_tablespace OID associated with a given relation. + * + * Note: failure return is InvalidOid, which cannot be distinguished from + * "default tablespace for this database", but that seems OK. + */ +Oid +get_rel_tablespace(Oid relid) +{ + HeapTuple tp; + + tp = SearchSysCache(RELOID, + ObjectIdGetDatum(relid), + 0, 0, 0); + if (HeapTupleIsValid(tp)) + { + Form_pg_class reltup = (Form_pg_class) GETSTRUCT(tp); + Oid result; + + result = reltup->reltablespace; + ReleaseSysCache(tp); + return result; + } + else + return InvalidOid; +} + +/* * get_rel_type_id * * Returns the pg_type OID associated with a given relation. @@ -1980,6 +2008,34 @@ get_namespace_name(Oid nspid) return NULL; } +/* + * get_namespace_tablespace + * Returns the default tablespace of a given namespace + * + * Note: failure return is InvalidOid, which cannot be distinguished from + * "default tablespace for this database", but that seems OK. + */ +Oid +get_namespace_tablespace(Oid nspid) +{ + HeapTuple tp; + + tp = SearchSysCache(NAMESPACEOID, + ObjectIdGetDatum(nspid), + 0, 0, 0); + if (HeapTupleIsValid(tp)) + { + Form_pg_namespace nsptup = (Form_pg_namespace) GETSTRUCT(tp); + Oid result; + + result = nsptup->nsptablespace; + ReleaseSysCache(tp); + return result; + } + else + return InvalidOid; +} + /* ---------- PG_SHADOW CACHE ---------- */ /* diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 4221976f0b1..ee8b46407e1 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.204 2004/05/30 23:40:37 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.205 2004/06/18 06:13:52 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -260,6 +260,7 @@ static void RelationBuildTupleDesc(RelationBuildDescInfo buildinfo, Relation relation); static Relation RelationBuildDesc(RelationBuildDescInfo buildinfo, Relation oldrelation); +static void RelationInitPhysicalAddr(Relation relation); static void AttrDefaultFetch(Relation relation); static void CheckConstraintFetch(Relation relation); static List *insert_ordered_oid(List *list, Oid datum); @@ -873,11 +874,10 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo, */ RelationInitLockInfo(relation); /* see lmgr.c */ - if (relation->rd_rel->relisshared) - relation->rd_node.tblNode = InvalidOid; - else - relation->rd_node.tblNode = MyDatabaseId; - relation->rd_node.relNode = relation->rd_rel->relfilenode; + /* + * initialize physical addressing information for the relation + */ + RelationInitPhysicalAddr(relation); /* make sure relation is marked as having no open file yet */ relation->rd_smgr = NULL; @@ -893,6 +893,23 @@ RelationBuildDesc(RelationBuildDescInfo buildinfo, } /* + * Initialize the physical addressing info (RelFileNode) for a relcache entry + */ +static void +RelationInitPhysicalAddr(Relation relation) +{ + if (relation->rd_rel->reltablespace) + relation->rd_node.spcNode = relation->rd_rel->reltablespace; + else + relation->rd_node.spcNode = MyDatabaseTableSpace; + if (relation->rd_rel->relisshared) + relation->rd_node.dbNode = InvalidOid; + else + relation->rd_node.dbNode = MyDatabaseId; + relation->rd_node.relNode = relation->rd_rel->relfilenode; +} + +/* * Initialize index-access-method support data for an index relation */ void @@ -1343,18 +1360,17 @@ formrdesc(const char *relationName, * initialize relation id from info in att array (my, this is ugly) */ RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid; + relation->rd_rel->relfilenode = RelationGetRelid(relation); /* - * initialize the relation's lock manager and RelFileNode information + * initialize the relation lock manager information */ RelationInitLockInfo(relation); /* see lmgr.c */ - if (relation->rd_rel->relisshared) - relation->rd_node.tblNode = InvalidOid; - else - relation->rd_node.tblNode = MyDatabaseId; - relation->rd_node.relNode = - relation->rd_rel->relfilenode = RelationGetRelid(relation); + /* + * initialize physical addressing information for the relation + */ + RelationInitPhysicalAddr(relation); /* * initialize the rel-has-index flag, using hardwired knowledge @@ -1570,7 +1586,8 @@ RelationReloadClassinfo(Relation relation) relation->rd_id); relp = (Form_pg_class) GETSTRUCT(pg_class_tuple); memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE); - relation->rd_node.relNode = relp->relfilenode; + /* Now we can recalculate physical address */ + RelationInitPhysicalAddr(relation); heap_freetuple(pg_class_tuple); relation->rd_targblock = InvalidBlockNumber; /* Okay, now it's valid again */ @@ -2040,8 +2057,9 @@ Relation RelationBuildLocalRelation(const char *relname, Oid relnamespace, TupleDesc tupDesc, - Oid relid, Oid dbid, - RelFileNode rnode, + Oid relid, + Oid reltablespace, + bool shared_relation, bool nailit) { Relation rel; @@ -2125,20 +2143,23 @@ RelationBuildLocalRelation(const char *relname, /* * Insert relation physical and logical identifiers (OIDs) into the - * right places. + * right places. Note that the physical ID (relfilenode) is initially + * the same as the logical ID (OID). */ - rel->rd_rel->relisshared = (dbid == InvalidOid); + rel->rd_rel->relisshared = shared_relation; RelationGetRelid(rel) = relid; for (i = 0; i < natts; i++) rel->rd_att->attrs[i]->attrelid = relid; - rel->rd_node = rnode; - rel->rd_rel->relfilenode = rnode.relNode; + rel->rd_rel->relfilenode = relid; + rel->rd_rel->reltablespace = reltablespace; RelationInitLockInfo(rel); /* see lmgr.c */ + RelationInitPhysicalAddr(rel); + /* * Okay to insert into the relcache hash tables. */ @@ -3053,16 +3074,12 @@ load_relcache_init_file(void) MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info)); /* - * Make sure database ID is correct. This is needed in case the - * pg_internal.init file was copied from some other database by - * CREATE DATABASE. + * Recompute lock and physical addressing info. This is needed in + * case the pg_internal.init file was copied from some other database + * by CREATE DATABASE. */ - if (rel->rd_rel->relisshared) - rel->rd_node.tblNode = InvalidOid; - else - rel->rd_node.tblNode = MyDatabaseId; - RelationInitLockInfo(rel); + RelationInitPhysicalAddr(rel); } /* diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 3a5d33724a4..4f0f8b67c24 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.90 2004/05/30 17:58:12 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.91 2004/06/18 06:13:54 tgl Exp $ * * NOTES * Globals used all over the place should be declared here and not @@ -58,6 +58,8 @@ BackendId MyBackendId = InvalidBackendId; char *DatabasePath = NULL; Oid MyDatabaseId = InvalidOid; +Oid MyDatabaseTableSpace = InvalidOid; + pid_t PostmasterPid = 0; /* diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 6f9ff5aef4d..b0c5ff82e38 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/miscinit.c,v 1.126 2004/05/30 23:40:38 neilc Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/miscinit.c,v 1.127 2004/06/18 06:13:54 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -157,7 +157,6 @@ void SetDataDir(const char *dir) { char *new; - int newlen; AssertArg(dir); @@ -212,13 +211,7 @@ SetDataDir(const char *dir) * Strip any trailing slash. Not strictly necessary, but avoids * generating funny-looking paths to individual files. */ - newlen = strlen(new); - if (newlen > 1 && (new[newlen - 1] == '/' -#ifdef WIN32 - || new[newlen - 1] == '\\' -#endif - )) - new[newlen - 1] = '\0'; + canonicalize_path(new); if (DataDir) free(DataDir); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index e80187d5752..48d28d429f2 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.133 2004/05/29 22:48:21 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.134 2004/06/18 06:13:54 tgl Exp $ * * *------------------------------------------------------------------------- @@ -26,6 +26,7 @@ #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_shadow.h" +#include "catalog/pg_tablespace.h" #include "commands/trigger.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -239,12 +240,12 @@ InitPostgres(const char *dbname, const char *username) if (bootstrap) { MyDatabaseId = TemplateDbOid; - SetDatabasePath(GetDatabasePath(MyDatabaseId)); + MyDatabaseTableSpace = DEFAULTTABLESPACE_OID; + SetDatabasePath(GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace)); } else { - char *fullpath, - datpath[MAXPGPATH]; + char *fullpath; /* * Formerly we validated DataDir here, but now that's done @@ -252,11 +253,11 @@ InitPostgres(const char *dbname, const char *username) */ /* - * Find oid and path of the database we're about to open. Since - * we're not yet up and running we have to use the hackish + * Find oid and tablespace of the database we're about to open. + * Since we're not yet up and running we have to use the hackish * GetRawDatabaseInfo. */ - GetRawDatabaseInfo(dbname, &MyDatabaseId, datpath); + GetRawDatabaseInfo(dbname, &MyDatabaseId, &MyDatabaseTableSpace); if (!OidIsValid(MyDatabaseId)) ereport(FATAL, @@ -264,7 +265,7 @@ InitPostgres(const char *dbname, const char *username) errmsg("database \"%s\" does not exist", dbname))); - fullpath = GetDatabasePath(MyDatabaseId); + fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); /* Verify the database path */ diff --git a/src/backend/utils/misc/database.c b/src/backend/utils/misc/database.c index 37844a03a94..1eeb5357040 100644 --- a/src/backend/utils/misc/database.c +++ b/src/backend/utils/misc/database.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/misc/database.c,v 1.60 2004/01/22 20:57:39 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/misc/database.c,v 1.61 2004/06/18 06:13:56 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -21,6 +21,7 @@ #include "catalog/catname.h" #include "catalog/catalog.h" #include "catalog/pg_database.h" +#include "catalog/pg_tablespace.h" #include "miscadmin.h" #include "utils/syscache.h" @@ -29,12 +30,13 @@ static bool PhonyHeapTupleSatisfiesNow(HeapTupleHeader tuple); /* -------------------------------- - * GetRawDatabaseInfo() -- Find the OID and path of the database. + * GetRawDatabaseInfo() -- Find the OID and tablespace of the database. * - * The database's oid forms half of the unique key for the system - * caches and lock tables. We therefore want it initialized before - * we open any relations, since opening relations puts things in the - * cache. To get around this problem, this code opens and scans the + * We need both the OID and the default tablespace in order to find + * the database's system catalogs. Moreover the database's OID forms + * half of the unique key for the system caches and lock tables, so + * we must have it before we can use any of the cache mechanisms. + * To get around these problems, this code opens and scans the * pg_database relation by hand. * * This code knows way more than it should about the layout of @@ -43,19 +45,21 @@ static bool PhonyHeapTupleSatisfiesNow(HeapTupleHeader tuple); * -------------------------------- */ void -GetRawDatabaseInfo(const char *name, Oid *db_id, char *path) +GetRawDatabaseInfo(const char *name, Oid *db_id, Oid *db_tablespace) { int dbfd; int nbytes; - int pathlen; HeapTupleData tup; + Form_pg_database tup_db; Page pg; char *dbfname; - Form_pg_database tup_db; RelFileNode rnode; - rnode.tblNode = 0; + /* hard-wired path to pg_database */ + rnode.spcNode = GLOBALTABLESPACE_OID; + rnode.dbNode = 0; rnode.relNode = RelOid_pg_database; + dbfname = relpath(rnode); if ((dbfd = open(dbfname, O_RDONLY | PG_BINARY, 0)) < 0) @@ -121,7 +125,7 @@ GetRawDatabaseInfo(const char *name, Oid *db_id, char *path) * committed and dead tuples to be marked with correct states. * * XXX wouldn't it be better to let new backends read the - * database OID from a flat file, handled the same way we + * database info from a flat file, handled the same way we * handle the password relation? */ if (!PhonyHeapTupleSatisfiesNow(tup.t_data)) @@ -134,15 +138,9 @@ GetRawDatabaseInfo(const char *name, Oid *db_id, char *path) if (strcmp(name, NameStr(tup_db->datname)) == 0) { - /* Found it; extract the OID and the database path. */ + /* Found it; extract the db's OID and tablespace. */ *db_id = HeapTupleGetOid(&tup); - pathlen = VARSIZE(&(tup_db->datpath)) - VARHDRSZ; - if (pathlen < 0) - pathlen = 0; /* pure paranoia */ - if (pathlen >= MAXPGPATH) - pathlen = MAXPGPATH - 1; /* more paranoia */ - strncpy(path, VARDATA(&(tup_db->datpath)), pathlen); - path[pathlen] = '\0'; + *db_tablespace = tup_db->dattablespace; goto done; } } @@ -150,7 +148,7 @@ GetRawDatabaseInfo(const char *name, Oid *db_id, char *path) /* failed to find it... */ *db_id = InvalidOid; - *path = '\0'; + *db_tablespace = InvalidOid; done: close(dbfd); |
