summaryrefslogtreecommitdiff
path: root/src/bin
diff options
context:
space:
mode:
authorPeter Eisentraut2017-01-19 17:00:00 +0000
committerPeter Eisentraut2017-01-20 14:04:49 +0000
commit665d1fad99e7b11678b0d5fa24d2898424243cd6 (patch)
treeeefe3eb528f840780aef6c09939a1844dbabb30a /src/bin
parentba61a04bc7fefeee03416d9911eb825c4897c223 (diff)
Logical replication
- Add PUBLICATION catalogs and DDL - Add SUBSCRIPTION catalog and DDL - Define logical replication protocol and output plugin - Add logical replication workers From: Petr Jelinek <petr@2ndquadrant.com> Reviewed-by: Steve Singer <steve@ssinger.info> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Erik Rijkers <er@xs4all.nl> Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Diffstat (limited to 'src/bin')
-rw-r--r--src/bin/pg_dump/common.c12
-rw-r--r--src/bin/pg_dump/pg_backup.h3
-rw-r--r--src/bin/pg_dump/pg_backup_archiver.c7
-rw-r--r--src/bin/pg_dump/pg_dump.c464
-rw-r--r--src/bin/pg_dump/pg_dump.h46
-rw-r--r--src/bin/pg_dump/pg_dump_sort.c20
-rw-r--r--src/bin/pg_dump/pg_restore.c3
-rw-r--r--src/bin/pg_dump/t/002_pg_dump.pl76
-rw-r--r--src/bin/psql/command.c16
-rw-r--r--src/bin/psql/describe.c292
-rw-r--r--src/bin/psql/describe.h9
-rw-r--r--src/bin/psql/help.c2
-rw-r--r--src/bin/psql/tab-complete.c53
13 files changed, 986 insertions, 17 deletions
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index d4e36421d29..89530a9f0fb 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -292,6 +292,18 @@ getSchemaData(Archive *fout, int *numTablesPtr)
write_msg(NULL, "reading partition key information for interesting tables\n");
getTablePartitionKeyInfo(fout, tblinfo, numTables);
+ if (g_verbose)
+ write_msg(NULL, "reading publications\n");
+ getPublications(fout);
+
+ if (g_verbose)
+ write_msg(NULL, "reading publication membership\n");
+ getPublicationTables(fout, tblinfo, numTables);
+
+ if (g_verbose)
+ write_msg(NULL, "reading subscriptions\n");
+ getSubscriptions(fout);
+
*numTablesPtr = numTables;
return tblinfo;
}
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 7241cdfc449..6480fb8e745 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -119,6 +119,7 @@ typedef struct _restoreOptions
bool *idWanted; /* array showing which dump IDs to emit */
int enable_row_security;
int sequence_data; /* dump sequence data even in schema-only mode */
+ int include_subscriptions;
} RestoreOptions;
typedef struct _dumpOptions
@@ -152,6 +153,8 @@ typedef struct _dumpOptions
int outputNoTablespaces;
int use_setsessauth;
int enable_row_security;
+ int include_subscriptions;
+ int no_create_subscription_slots;
/* default, if no "inclusion" switches appear, is to dump everything */
bool include_everything;
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index b89bd99e49f..929f1b592bc 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -172,6 +172,7 @@ dumpOptionsFromRestoreOptions(RestoreOptions *ropt)
dopt->include_everything = ropt->include_everything;
dopt->enable_row_security = ropt->enable_row_security;
dopt->sequence_data = ropt->sequence_data;
+ dopt->include_subscriptions = ropt->include_subscriptions;
return dopt;
}
@@ -3266,6 +3267,8 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
strcmp(type, "SCHEMA") == 0 ||
strcmp(type, "FOREIGN DATA WRAPPER") == 0 ||
strcmp(type, "SERVER") == 0 ||
+ strcmp(type, "PUBLICATION") == 0 ||
+ strcmp(type, "SUBSCRIPTION") == 0 ||
strcmp(type, "USER MAPPING") == 0)
{
/* We already know that search_path was set properly */
@@ -3476,7 +3479,9 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass)
strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 ||
strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 ||
strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 ||
- strcmp(te->desc, "SERVER") == 0)
+ strcmp(te->desc, "SERVER") == 0 ||
+ strcmp(te->desc, "PUBLICATION") == 0 ||
+ strcmp(te->desc, "SUBSCRIPTION") == 0)
{
PQExpBuffer temp = createPQExpBuffer();
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 883fde1e5aa..0bb363957a4 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -246,6 +246,9 @@ static void getBlobs(Archive *fout);
static void dumpBlob(Archive *fout, BlobInfo *binfo);
static int dumpBlobs(Archive *fout, void *arg);
static void dumpPolicy(Archive *fout, PolicyInfo *polinfo);
+static void dumpPublication(Archive *fout, PublicationInfo *pubinfo);
+static void dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo);
+static void dumpSubscription(Archive *fout, SubscriptionInfo *subinfo);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
@@ -338,6 +341,7 @@ main(int argc, char **argv)
{"enable-row-security", no_argument, &dopt.enable_row_security, 1},
{"exclude-table-data", required_argument, NULL, 4},
{"if-exists", no_argument, &dopt.if_exists, 1},
+ {"include-subscriptions", no_argument, &dopt.include_subscriptions, 1},
{"inserts", no_argument, &dopt.dump_inserts, 1},
{"lock-wait-timeout", required_argument, NULL, 2},
{"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
@@ -348,6 +352,7 @@ main(int argc, char **argv)
{"snapshot", required_argument, NULL, 6},
{"strict-names", no_argument, &strict_names, 1},
{"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1},
+ {"no-create-subscription-slots", no_argument, &dopt.no_create_subscription_slots, 1},
{"no-security-labels", no_argument, &dopt.no_security_labels, 1},
{"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1},
{"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1},
@@ -849,6 +854,7 @@ main(int argc, char **argv)
ropt->include_everything = dopt.include_everything;
ropt->enable_row_security = dopt.enable_row_security;
ropt->sequence_data = dopt.sequence_data;
+ ropt->include_subscriptions = dopt.include_subscriptions;
if (compressLevel == -1)
ropt->compression = 0;
@@ -929,7 +935,10 @@ help(const char *progname)
" access to)\n"));
printf(_(" --exclude-table-data=TABLE do NOT dump data for the named table(s)\n"));
printf(_(" --if-exists use IF EXISTS when dropping objects\n"));
+ printf(_(" --include-subscriptions dump logical replication subscriptions\n"));
printf(_(" --inserts dump data as INSERT commands, rather than COPY\n"));
+ printf(_(" --no-create-subscription-slots\n"
+ " do not create replication slots for subscriptions\n"));
printf(_(" --no-security-labels do not dump security label assignments\n"));
printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n"));
printf(_(" --no-tablespaces do not dump tablespace assignments\n"));
@@ -3311,6 +3320,449 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo)
destroyPQExpBuffer(delqry);
}
+/*
+ * getPublications
+ * get information about publications
+ */
+void
+getPublications(Archive *fout)
+{
+ PQExpBuffer query;
+ PGresult *res;
+ PublicationInfo *pubinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_pubname;
+ int i_rolname;
+ int i_puballtables;
+ int i_pubinsert;
+ int i_pubupdate;
+ int i_pubdelete;
+ int i,
+ ntups;
+
+ if (fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ resetPQExpBuffer(query);
+
+ /* Get the publications. */
+ appendPQExpBuffer(query,
+ "SELECT p.tableoid, p.oid, p.pubname, "
+ "(%s p.pubowner) AS rolname, "
+ "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete "
+ "FROM pg_catalog.pg_publication p",
+ username_subquery);
+
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * There are no publications defined. Clean up and return.
+ */
+ PQclear(res);
+ return;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_pubname = PQfnumber(res, "pubname");
+ i_rolname = PQfnumber(res, "rolname");
+ i_puballtables = PQfnumber(res, "puballtables");
+ i_pubinsert = PQfnumber(res, "pubinsert");
+ i_pubupdate = PQfnumber(res, "pubupdate");
+ i_pubdelete = PQfnumber(res, "pubdelete");
+
+ pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
+
+ for (i = 0; i < ntups; i++)
+ {
+ pubinfo[i].dobj.objType = DO_PUBLICATION;
+ pubinfo[i].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, i, i_tableoid));
+ pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
+ AssignDumpId(&pubinfo[i].dobj);
+ pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname));
+ pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
+ pubinfo[i].puballtables =
+ (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0);
+ pubinfo[i].pubinsert =
+ (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0);
+ pubinfo[i].pubupdate =
+ (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0);
+ pubinfo[i].pubdelete =
+ (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
+
+ if (strlen(pubinfo[i].rolname) == 0)
+ write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n",
+ pubinfo[i].dobj.name);
+ }
+ PQclear(res);
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpPublication
+ * dump the definition of the given publication
+ */
+static void
+dumpPublication(Archive *fout, PublicationInfo *pubinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer delq;
+ PQExpBuffer query;
+
+ if (dopt->dataOnly)
+ return;
+
+ delq = createPQExpBuffer();
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n",
+ fmtId(pubinfo->dobj.name));
+
+ appendPQExpBuffer(query, "CREATE PUBLICATION %s",
+ fmtId(pubinfo->dobj.name));
+
+ if (pubinfo->puballtables)
+ appendPQExpBufferStr(query, " FOR ALL TABLES");
+
+ appendPQExpBufferStr(query, " WITH (");
+ if (pubinfo->pubinsert)
+ appendPQExpBufferStr(query, "PUBLISH INSERT");
+ else
+ appendPQExpBufferStr(query, "NOPUBLISH INSERT");
+
+ if (pubinfo->pubupdate)
+ appendPQExpBufferStr(query, ", PUBLISH UPDATE");
+ else
+ appendPQExpBufferStr(query, ", NOPUBLISH UPDATE");
+
+ if (pubinfo->pubdelete)
+ appendPQExpBufferStr(query, ", PUBLISH DELETE");
+ else
+ appendPQExpBufferStr(query, ", NOPUBLISH DELETE");
+
+ appendPQExpBufferStr(query, ");\n");
+
+ ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
+ pubinfo->dobj.name,
+ NULL,
+ NULL,
+ pubinfo->rolname, false,
+ "PUBLICATION", SECTION_POST_DATA,
+ query->data, delq->data, NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ destroyPQExpBuffer(delq);
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * getPublicationTables
+ * get information about publication membership for dumpable tables.
+ */
+void
+getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
+{
+ PQExpBuffer query;
+ PGresult *res;
+ PublicationRelInfo *pubrinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_pubname;
+ int i,
+ j,
+ ntups;
+
+ if (fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ for (i = 0; i < numTables; i++)
+ {
+ TableInfo *tbinfo = &tblinfo[i];
+
+ /* Only plain tables can be aded to publications. */
+ if (tbinfo->relkind != RELKIND_RELATION)
+ continue;
+
+ /*
+ * Ignore publication membership of tables whose definitions are
+ * not to be dumped.
+ */
+ if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+ continue;
+
+ if (g_verbose)
+ write_msg(NULL, "reading publication membership for table \"%s.%s\"\n",
+ tbinfo->dobj.namespace->dobj.name,
+ tbinfo->dobj.name);
+
+ resetPQExpBuffer(query);
+
+ /* Get the publication memebership for the table. */
+ appendPQExpBuffer(query,
+ "SELECT pr.tableoid, pr.oid, p.pubname "
+ "FROM pg_catalog.pg_publication_rel pr,"
+ " pg_catalog.pg_publication p "
+ "WHERE pr.prrelid = '%u'"
+ " AND p.oid = pr.prpubid",
+ tbinfo->dobj.catId.oid);
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * Table is not member of any publications. Clean up and return.
+ */
+ PQclear(res);
+ continue;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_pubname = PQfnumber(res, "pubname");
+
+ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
+
+ for (j = 0; j < ntups; j++)
+ {
+ pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
+ pubrinfo[j].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, j, i_tableoid));
+ pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
+ AssignDumpId(&pubrinfo[j].dobj);
+ pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
+ pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
+ pubrinfo[j].pubtable = tbinfo;
+ }
+ PQclear(res);
+ }
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpPublicationTable
+ * dump the definition of the given publication table mapping
+ */
+static void
+dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ TableInfo *tbinfo = pubrinfo->pubtable;
+ PQExpBuffer query;
+ char *tag;
+
+ if (dopt->dataOnly)
+ return;
+
+ tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name);
+
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE",
+ fmtId(pubrinfo->pubname));
+ appendPQExpBuffer(query, " %s;",
+ fmtId(tbinfo->dobj.name));
+
+ /*
+ * There is no point in creating drop query as drop query as the drop
+ * is done by table drop.
+ */
+ ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
+ tag,
+ tbinfo->dobj.namespace->dobj.name,
+ NULL,
+ "", false,
+ "PUBLICATION TABLE", SECTION_POST_DATA,
+ query->data, "", NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ free(tag);
+ destroyPQExpBuffer(query);
+}
+
+
+/*
+ * getSubscriptions
+ * get information about subscriptions
+ */
+void
+getSubscriptions(Archive *fout)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer query;
+ PGresult *res;
+ SubscriptionInfo *subinfo;
+ int i_tableoid;
+ int i_oid;
+ int i_subname;
+ int i_rolname;
+ int i_subenabled;
+ int i_subconninfo;
+ int i_subslotname;
+ int i_subpublications;
+ int i,
+ ntups;
+
+ if (!dopt->include_subscriptions || fout->remoteVersion < 100000)
+ return;
+
+ query = createPQExpBuffer();
+
+ resetPQExpBuffer(query);
+
+ /* Get the subscriptions in current database. */
+ appendPQExpBuffer(query,
+ "SELECT s.tableoid, s.oid, s.subname,"
+ "(%s s.subowner) AS rolname, s.subenabled, "
+ " s.subconninfo, s.subslotname, s.subpublications "
+ "FROM pg_catalog.pg_subscription s "
+ "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database"
+ " WHERE datname = current_database())",
+ username_subquery);
+ res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+ ntups = PQntuples(res);
+
+ if (ntups == 0)
+ {
+ /*
+ * There are no subscriptions defined. Clean up and return.
+ */
+ PQclear(res);
+ return;
+ }
+
+ i_tableoid = PQfnumber(res, "tableoid");
+ i_oid = PQfnumber(res, "oid");
+ i_subname = PQfnumber(res, "subname");
+ i_rolname = PQfnumber(res, "rolname");
+ i_subenabled = PQfnumber(res, "subenabled");
+ i_subconninfo = PQfnumber(res, "subconninfo");
+ i_subslotname = PQfnumber(res, "subslotname");
+ i_subpublications = PQfnumber(res, "subpublications");
+
+ subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
+
+ for (i = 0; i < ntups; i++)
+ {
+ subinfo[i].dobj.objType = DO_SUBSCRIPTION;
+ subinfo[i].dobj.catId.tableoid =
+ atooid(PQgetvalue(res, i, i_tableoid));
+ subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
+ AssignDumpId(&subinfo[i].dobj);
+ subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
+ subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname));
+ subinfo[i].subenabled =
+ (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
+ subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo));
+ subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname));
+ subinfo[i].subpublications =
+ pg_strdup(PQgetvalue(res, i, i_subpublications));
+
+ if (strlen(subinfo[i].rolname) == 0)
+ write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n",
+ subinfo[i].dobj.name);
+ }
+ PQclear(res);
+
+ destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpSubscription
+ * dump the definition of the given subscription
+ */
+static void
+dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
+{
+ DumpOptions *dopt = fout->dopt;
+ PQExpBuffer delq;
+ PQExpBuffer query;
+ PQExpBuffer publications;
+ char **pubnames = NULL;
+ int npubnames = 0;
+ int i;
+
+ if (dopt->dataOnly)
+ return;
+
+ delq = createPQExpBuffer();
+ query = createPQExpBuffer();
+
+ appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
+ fmtId(subinfo->dobj.name));
+
+ appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+ fmtId(subinfo->dobj.name));
+ appendStringLiteralAH(query, subinfo->subconninfo, fout);
+
+ /* Build list of quoted publications and append them to query. */
+ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
+ {
+ write_msg(NULL,
+ "WARNING: could not parse subpublications array\n");
+ if (pubnames)
+ free(pubnames);
+ pubnames = NULL;
+ npubnames = 0;
+ }
+
+ publications = createPQExpBuffer();
+ for (i = 0; i < npubnames; i++)
+ {
+ if (i > 0)
+ appendPQExpBufferStr(publications, ", ");
+
+ appendPQExpBufferStr(publications, fmtId(pubnames[i]));
+ }
+
+ appendPQExpBuffer(query, " PUBLICATION %s WITH (", publications->data);
+
+ if (subinfo->subenabled)
+ appendPQExpBufferStr(query, "ENABLED");
+ else
+ appendPQExpBufferStr(query, "DISABLED");
+
+ appendPQExpBufferStr(query, ", SLOT NAME = ");
+ appendStringLiteralAH(query, subinfo->subslotname, fout);
+
+ if (dopt->no_create_subscription_slots)
+ appendPQExpBufferStr(query, ", NOCREATE SLOT");
+
+ appendPQExpBufferStr(query, ");\n");
+
+ ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
+ subinfo->dobj.name,
+ NULL,
+ NULL,
+ subinfo->rolname, false,
+ "SUBSCRIPTION", SECTION_POST_DATA,
+ query->data, delq->data, NULL,
+ NULL, 0,
+ NULL, NULL);
+
+ destroyPQExpBuffer(publications);
+ if (pubnames)
+ free(pubnames);
+
+ destroyPQExpBuffer(delq);
+ destroyPQExpBuffer(query);
+}
+
static void
binary_upgrade_set_type_oids_by_type_oid(Archive *fout,
PQExpBuffer upgrade_buffer,
@@ -8752,6 +9204,15 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_POLICY:
dumpPolicy(fout, (PolicyInfo *) dobj);
break;
+ case DO_PUBLICATION:
+ dumpPublication(fout, (PublicationInfo *) dobj);
+ break;
+ case DO_PUBLICATION_REL:
+ dumpPublicationTable(fout, (PublicationRelInfo *) dobj);
+ break;
+ case DO_SUBSCRIPTION:
+ dumpSubscription(fout, (SubscriptionInfo *) dobj);
+ break;
case DO_PRE_DATA_BOUNDARY:
case DO_POST_DATA_BOUNDARY:
/* never dumped, nothing to do */
@@ -16627,6 +17088,9 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
case DO_EVENT_TRIGGER:
case DO_DEFAULT_ACL:
case DO_POLICY:
+ case DO_PUBLICATION:
+ case DO_PUBLICATION_REL:
+ case DO_SUBSCRIPTION:
/* Post-data objects: must come after the post-data boundary */
addObjectDependency(dobj, postDataBound->dumpId);
break;
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 0c920a39076..77de22fcb8b 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -79,7 +79,10 @@ typedef enum
DO_POST_DATA_BOUNDARY,
DO_EVENT_TRIGGER,
DO_REFRESH_MATVIEW,
- DO_POLICY
+ DO_POLICY,
+ DO_PUBLICATION,
+ DO_PUBLICATION_REL,
+ DO_SUBSCRIPTION
} DumpableObjectType;
/* component types of an object which can be selected for dumping */
@@ -567,6 +570,43 @@ typedef struct _policyInfo
} PolicyInfo;
/*
+ * The PublicationInfo struct is used to represent publications.
+ */
+typedef struct _PublicationInfo
+{
+ DumpableObject dobj;
+ char *rolname;
+ bool puballtables;
+ bool pubinsert;
+ bool pubupdate;
+ bool pubdelete;
+} PublicationInfo;
+
+/*
+ * The PublicationRelInfo struct is used to represent publication table
+ * mapping.
+ */
+typedef struct _PublicationRelInfo
+{
+ DumpableObject dobj;
+ TableInfo *pubtable;
+ char *pubname;
+} PublicationRelInfo;
+
+/*
+ * The SubscriptionInfo struct is used to represent subscription.
+ */
+typedef struct _SubscriptionInfo
+{
+ DumpableObject dobj;
+ char *rolname;
+ bool subenabled;
+ char *subconninfo;
+ char *subslotname;
+ char *subpublications;
+} SubscriptionInfo;
+
+/*
* We build an array of these with an entry for each object that is an
* extension member according to pg_depend.
*/
@@ -663,5 +703,9 @@ extern void processExtensionTables(Archive *fout, ExtensionInfo extinfo[],
extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers);
extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables);
extern void getTablePartitionKeyInfo(Archive *fout, TableInfo *tblinfo, int numTables);
+extern void getPublications(Archive *fout);
+extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
+ int numTables);
+extern void getSubscriptions(Archive *fout);
#endif /* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 1db680b950c..ea643397ba8 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -71,7 +71,10 @@ static const int dbObjectTypePriority[] =
26, /* DO_POST_DATA_BOUNDARY */
33, /* DO_EVENT_TRIGGER */
34, /* DO_REFRESH_MATVIEW */
- 35 /* DO_POLICY */
+ 35, /* DO_POLICY */
+ 36, /* DO_PUBLICATION */
+ 37, /* DO_PUBLICATION_REL */
+ 38 /* DO_SUBSCRIPTION */
};
static DumpId preDataBoundId;
@@ -1397,6 +1400,21 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
"POLICY (ID %d OID %u)",
obj->dumpId, obj->catId.oid);
return;
+ case DO_PUBLICATION:
+ snprintf(buf, bufsize,
+ "PUBLICATION (ID %d OID %u)",
+ obj->dumpId, obj->catId.oid);
+ return;
+ case DO_PUBLICATION_REL:
+ snprintf(buf, bufsize,
+ "PUBLICATION TABLE (ID %d OID %u)",
+ obj->dumpId, obj->catId.oid);
+ return;
+ case DO_SUBSCRIPTION:
+ snprintf(buf, bufsize,
+ "SUBSCRIPTION (ID %d OID %u)",
+ obj->dumpId, obj->catId.oid);
+ return;
case DO_PRE_DATA_BOUNDARY:
snprintf(buf, bufsize,
"PRE-DATA BOUNDARY (ID %d)",
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 239b0d8ac03..497677494bc 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -72,6 +72,7 @@ main(int argc, char **argv)
char *inputFileSpec;
static int disable_triggers = 0;
static int enable_row_security = 0;
+ static int include_subscriptions = 0;
static int if_exists = 0;
static int no_data_for_failed_tables = 0;
static int outputNoTablespaces = 0;
@@ -116,6 +117,7 @@ main(int argc, char **argv)
{"disable-triggers", no_argument, &disable_triggers, 1},
{"enable-row-security", no_argument, &enable_row_security, 1},
{"if-exists", no_argument, &if_exists, 1},
+ {"include-subscriptions", no_argument, &include_subscriptions, 1},
{"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
{"no-tablespaces", no_argument, &outputNoTablespaces, 1},
{"role", required_argument, NULL, 2},
@@ -356,6 +358,7 @@ main(int argc, char **argv)
opts->disable_triggers = disable_triggers;
opts->enable_row_security = enable_row_security;
+ opts->include_subscriptions = include_subscriptions;
opts->noDataForFailedTables = no_data_for_failed_tables;
opts->noTablespace = outputNoTablespaces;
opts->use_setsessauth = use_setsessauth;
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index b732627c3a2..488eec30f57 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -301,7 +301,7 @@ my %tests = (
'ALTER FUNCTION dump_test.pltestlang_call_handler() OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^
\QALTER FUNCTION dump_test.pltestlang_call_handler() \E
\QOWNER TO \E
@@ -358,7 +358,7 @@ my %tests = (
'ALTER PROCEDURAL LANGUAGE pltestlang OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER PROCEDURAL LANGUAGE pltestlang OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -382,7 +382,7 @@ my %tests = (
'ALTER SCHEMA dump_test OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER SCHEMA dump_test OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -406,7 +406,7 @@ my %tests = (
'ALTER SCHEMA dump_test_second_schema OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER SCHEMA dump_test_second_schema OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -524,7 +524,7 @@ my %tests = (
'ALTER TABLE test_table OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER TABLE test_table OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -577,7 +577,7 @@ my %tests = (
'ALTER TABLE test_second_table OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER TABLE test_second_table OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -601,7 +601,7 @@ my %tests = (
'ALTER TABLE test_third_table OWNER TO' => {
all_runs => 1,
- catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)',
+ catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)',
regexp => qr/^ALTER TABLE test_third_table OWNER TO .*;/m,
like => {
binary_upgrade => 1,
@@ -623,10 +623,10 @@ my %tests = (
only_dump_test_table => 1,
test_schema_plus_blobs => 1, }, },
- # catch-all for ALTER ... OWNER (except LARGE OBJECTs)
- 'ALTER ... OWNER commands (except LARGE OBJECTs)' => {
+ # catch-all for ALTER ... OWNER (except LARGE OBJECTs and PUBLICATIONs)
+ 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)' => {
all_runs => 0, # catch-all
- regexp => qr/^ALTER (?!LARGE OBJECT)(.*) OWNER TO .*;/m,
+ regexp => qr/^ALTER (?!LARGE OBJECT|PUBLICATION)(.*) OWNER TO .*;/m,
like => {}, # use more-specific options above
unlike => {
column_inserts => 1,
@@ -2217,6 +2217,62 @@ my %tests = (
pg_dumpall_globals_clean => 1,
role => 1,
section_pre_data => 1, }, },
+
+ 'CREATE PUBLICATION pub1' => {
+ create_order => 50,
+ create_sql => 'CREATE PUBLICATION pub1;',
+ regexp => qr/^
+ \QCREATE PUBLICATION pub1 WITH (PUBLISH INSERT, PUBLISH UPDATE, PUBLISH DELETE);\E
+ /xm,
+ like => {
+ binary_upgrade => 1,
+ clean => 1,
+ clean_if_exists => 1,
+ createdb => 1,
+ defaults => 1,
+ exclude_test_table_data => 1,
+ exclude_dump_test_schema => 1,
+ exclude_test_table => 1,
+ no_privs => 1,
+ no_owner => 1,
+ only_dump_test_schema => 1,
+ only_dump_test_table => 1,
+ pg_dumpall_dbprivs => 1,
+ schema_only => 1,
+ section_post_data => 1,
+ test_schema_plus_blobs => 1, },
+ unlike => {
+ section_pre_data => 1,
+ pg_dumpall_globals => 1,
+ pg_dumpall_globals_clean => 1, }, },
+ 'ALTER PUBLICATION pub1 ADD TABLE test_table' => {
+ create_order => 51,
+ create_sql => 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_table;',
+ regexp => qr/^
+ \QALTER PUBLICATION pub1 ADD TABLE test_table;\E
+ /xm,
+ like => {
+ binary_upgrade => 1,
+ clean => 1,
+ clean_if_exists => 1,
+ createdb => 1,
+ defaults => 1,
+ exclude_test_table_data => 1,
+ no_privs => 1,
+ no_owner => 1,
+ only_dump_test_schema => 1,
+ only_dump_test_table => 1,
+ pg_dumpall_dbprivs => 1,
+ schema_only => 1,
+ section_post_data => 1,
+ test_schema_plus_blobs => 1, },
+ unlike => {
+ section_pre_data => 1,
+ exclude_dump_test_schema => 1,
+ exclude_test_table => 1,
+ pg_dumpall_globals => 1,
+ pg_dumpall_globals_clean => 1, }, },
+
'CREATE SCHEMA dump_test' => {
all_runs => 1,
catch_all => 'CREATE ... commands',
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 4139b7763fb..0c164a339c1 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -501,6 +501,22 @@ exec_command(const char *cmd,
else
success = PSQL_CMD_UNKNOWN;
break;
+ case 'R':
+ switch (cmd[2])
+ {
+ case 'p':
+ if (show_verbose)
+ success = describePublications(pattern);
+ else
+ success = listPublications(pattern);
+ break;
+ case 's':
+ success = describeSubscriptions(pattern, show_verbose);
+ break;
+ default:
+ status = PSQL_CMD_UNKNOWN;
+ }
+ break;
case 'u':
success = describeRoles(pattern, show_verbose, show_system);
break;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index ce198779f49..c501168d8c7 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2387,6 +2387,38 @@ describeOneTableDetails(const char *schemaname,
}
PQclear(result);
}
+
+ /* print any publications */
+ if (pset.sversion >= 100000)
+ {
+ printfPQExpBuffer(&buf,
+ "SELECT pub.pubname\n"
+ " FROM pg_catalog.pg_publication pub\n"
+ " LEFT JOIN pg_publication_rel pr\n"
+ " ON (pr.prpubid = pub.oid)\n"
+ "WHERE pr.prrelid = '%s' OR pub.puballtables\n"
+ "ORDER BY 1;",
+ oid);
+
+ result = PSQLexec(buf.data);
+ if (!result)
+ goto error_return;
+ else
+ tuples = PQntuples(result);
+
+ if (tuples > 0)
+ printTableAddFooter(&cont, _("Publications:"));
+
+ /* Might be an empty set - that's ok */
+ for (i = 0; i < tuples; i++)
+ {
+ printfPQExpBuffer(&buf, " \"%s\"",
+ PQgetvalue(result, i, 0));
+
+ printTableAddFooter(&cont, buf.data);
+ }
+ PQclear(result);
+ }
}
if (view_def)
@@ -4846,6 +4878,266 @@ listOneExtensionContents(const char *extname, const char *oid)
return true;
}
+/* \dRp
+ * Lists publications.
+ *
+ * Takes an optional regexp to select particular publications
+ */
+bool
+listPublications(const char *pattern)
+{
+ PQExpBufferData buf;
+ PGresult *res;
+ printQueryOpt myopt = pset.popt;
+ static const bool translate_columns[] = {false, false, false, false, false};
+
+ if (pset.sversion < 100000)
+ {
+ char sverbuf[32];
+ psql_error("The server (version %s) does not support publications.\n",
+ formatPGVersionNumber(pset.sversion, false,
+ sverbuf, sizeof(sverbuf)));
+ return true;
+ }
+
+ initPQExpBuffer(&buf);
+
+ printfPQExpBuffer(&buf,
+ "SELECT pubname AS \"%s\",\n"
+ " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n"
+ " pubinsert AS \"%s\",\n"
+ " pubupdate AS \"%s\",\n"
+ " pubdelete AS \"%s\"\n",
+ gettext_noop("Name"),
+ gettext_noop("Owner"),
+ gettext_noop("Inserts"),
+ gettext_noop("Updates"),
+ gettext_noop("Deletes"));
+
+ appendPQExpBufferStr(&buf,
+ "\nFROM pg_catalog.pg_publication\n");
+
+ processSQLNamePattern(pset.db, &buf, pattern, false, false,
+ NULL, "pubname", NULL,
+ NULL);
+
+ appendPQExpBufferStr(&buf, "ORDER BY 1;");
+
+ res = PSQLexec(buf.data);
+ termPQExpBuffer(&buf);
+ if (!res)
+ return false;
+
+ myopt.nullPrint = NULL;
+ myopt.title = _("List of publications");
+ myopt.translate_header = true;
+ myopt.translate_columns = translate_columns;
+ myopt.n_translate_columns = lengthof(translate_columns);
+
+ printQuery(res, &myopt, pset.queryFout, false, pset.logfile);
+
+ PQclear(res);
+
+ return true;
+}
+
+/* \dRp+
+ * Describes publications including the contents.
+ *
+ * Takes an optional regexp to select particular publications
+ */
+bool
+describePublications(const char *pattern)
+{
+ PQExpBufferData buf;
+ int i;
+ PGresult *res;
+
+ if (pset.sversion < 100000)
+ {
+ char sverbuf[32];
+ psql_error("The server (version %s) does not support publications.\n",
+ formatPGVersionNumber(pset.sversion, false,
+ sverbuf, sizeof(sverbuf)));
+ return true;
+ }
+
+ initPQExpBuffer(&buf);
+
+ printfPQExpBuffer(&buf,
+ "SELECT oid, pubname, puballtables, pubinsert,\n"
+ " pubupdate, pubdelete\n"
+ "FROM pg_catalog.pg_publication\n");
+
+ processSQLNamePattern(pset.db, &buf, pattern, false, false,
+ NULL, "pubname", NULL,
+ NULL);
+
+ appendPQExpBufferStr(&buf, "ORDER BY 2;");
+
+ res = PSQLexec(buf.data);
+ if (!res)
+ {
+ termPQExpBuffer(&buf);
+ return false;
+ }
+
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ const char align = 'l';
+ int ncols = 3;
+ int nrows = 1;
+ int tables = 0;
+ PGresult *tabres;
+ char *pubid = PQgetvalue(res, i, 0);
+ char *pubname = PQgetvalue(res, i, 1);
+ bool puballtables = strcmp(PQgetvalue(res, i, 2), "t") == 0;
+ int j;
+ PQExpBufferData title;
+ printTableOpt myopt = pset.popt.topt;
+ printTableContent cont;
+
+ initPQExpBuffer(&title);
+ printfPQExpBuffer(&title, _("Publication %s"), pubname);
+ printTableInit(&cont, &myopt, title.data, ncols, nrows);
+
+ printTableAddHeader(&cont, gettext_noop("Inserts"), true, align);
+ printTableAddHeader(&cont, gettext_noop("Updates"), true, align);
+ printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
+
+ printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
+ printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false);
+ printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false);
+
+ if (puballtables)
+ printfPQExpBuffer(&buf,
+ "SELECT n.nspname, c.relname\n"
+ "FROM pg_catalog.pg_class c,\n"
+ " pg_catalog.pg_namespace n\n"
+ "WHERE c.relnamespace = n.oid\n"
+ " AND c.relkind = 'r'\n"
+ " AND n.nspname <> 'pg_catalog'\n"
+ " AND n.nspname <> 'information_schema'\n"
+ "ORDER BY 1,2");
+ else
+ printfPQExpBuffer(&buf,
+ "SELECT n.nspname, c.relname\n"
+ "FROM pg_catalog.pg_class c,\n"
+ " pg_catalog.pg_namespace n,\n"
+ " pg_catalog.pg_publication_rel pr\n"
+ "WHERE c.relnamespace = n.oid\n"
+ " AND c.oid = pr.prrelid\n"
+ " AND pr.prpubid = '%s'\n"
+ "ORDER BY 1,2", pubid);
+
+ tabres = PSQLexec(buf.data);
+ if (!tabres)
+ {
+ printTableCleanup(&cont);
+ PQclear(res);
+ termPQExpBuffer(&buf);
+ termPQExpBuffer(&title);
+ return false;
+ }
+ else
+ tables = PQntuples(tabres);
+
+ if (tables > 0)
+ printTableAddFooter(&cont, _("Tables:"));
+
+ for (j = 0; j < tables; j++)
+ {
+ printfPQExpBuffer(&buf, " \"%s.%s\"",
+ PQgetvalue(tabres, j, 0),
+ PQgetvalue(tabres, j, 1));
+
+ printTableAddFooter(&cont, buf.data);
+ }
+ PQclear(tabres);
+
+ printTable(&cont, pset.queryFout, false, pset.logfile);
+ printTableCleanup(&cont);
+
+ termPQExpBuffer(&title);
+ }
+
+ termPQExpBuffer(&buf);
+ PQclear(res);
+
+ return true;
+}
+
+/* \dRs
+ * Describes subscriptions.
+ *
+ * Takes an optional regexp to select particular subscriptions
+ */
+bool
+describeSubscriptions(const char *pattern, bool verbose)
+{
+ PQExpBufferData buf;
+ PGresult *res;
+ printQueryOpt myopt = pset.popt;
+ static const bool translate_columns[] = {false, false, false, false, false};
+
+ if (pset.sversion < 100000)
+ {
+ char sverbuf[32];
+ psql_error("The server (version %s) does not support subscriptions.\n",
+ formatPGVersionNumber(pset.sversion, false,
+ sverbuf, sizeof(sverbuf)));
+ return true;
+ }
+
+ initPQExpBuffer(&buf);
+
+ printfPQExpBuffer(&buf,
+ "SELECT subname AS \"%s\"\n"
+ ", pg_catalog.pg_get_userbyid(subowner) AS \"%s\"\n"
+ ", subenabled AS \"%s\"\n"
+ ", subpublications AS \"%s\"\n",
+ gettext_noop("Name"),
+ gettext_noop("Owner"),
+ gettext_noop("Enabled"),
+ gettext_noop("Publication"));
+
+ if (verbose)
+ {
+ appendPQExpBuffer(&buf,
+ ", subconninfo AS \"%s\"\n",
+ gettext_noop("Conninfo"));
+ }
+
+ /* Only display subscritpions in current database. */
+ appendPQExpBufferStr(&buf,
+ "FROM pg_catalog.pg_subscription\n"
+ "WHERE subdbid = (SELECT oid\n"
+ " FROM pg_catalog.pg_database\n"
+ " WHERE datname = current_database())");
+
+ processSQLNamePattern(pset.db, &buf, pattern, true, false,
+ NULL, "subname", NULL,
+ NULL);
+
+ appendPQExpBufferStr(&buf, "ORDER BY 1;");
+
+ res = PSQLexec(buf.data);
+ termPQExpBuffer(&buf);
+ if (!res)
+ return false;
+
+ myopt.nullPrint = NULL;
+ myopt.title = _("List of subscriptions");
+ myopt.translate_header = true;
+ myopt.translate_columns = translate_columns;
+ myopt.n_translate_columns = lengthof(translate_columns);
+
+ printQuery(res, &myopt, pset.queryFout, false, pset.logfile);
+
+ PQclear(res);
+ return true;
+}
+
/*
* printACLColumn
*
diff --git a/src/bin/psql/describe.h b/src/bin/psql/describe.h
index 4600182e4c9..074553e1334 100644
--- a/src/bin/psql/describe.h
+++ b/src/bin/psql/describe.h
@@ -102,4 +102,13 @@ extern bool listExtensionContents(const char *pattern);
/* \dy */
extern bool listEventTriggers(const char *pattern, bool verbose);
+/* \dRp */
+bool listPublications(const char *pattern);
+
+/* \dRp+ */
+bool describePublications(const char *pattern);
+
+/* \dRs */
+bool describeSubscriptions(const char *pattern, bool verbose);
+
#endif /* DESCRIBE_H */
diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c
index 09baf871dda..53656294da4 100644
--- a/src/bin/psql/help.c
+++ b/src/bin/psql/help.c
@@ -241,6 +241,8 @@ slashUsage(unsigned short int pager)
fprintf(output, _(" \\dO[S+] [PATTERN] list collations\n"));
fprintf(output, _(" \\dp [PATTERN] list table, view, and sequence access privileges\n"));
fprintf(output, _(" \\drds [PATRN1 [PATRN2]] list per-database role settings\n"));
+ fprintf(output, _(" \\dRp[+] [PATTERN] list replication publications\n"));
+ fprintf(output, _(" \\dRs[+] [PATTERN] list replication subscriptions\n"));
fprintf(output, _(" \\ds[S+] [PATTERN] list sequences\n"));
fprintf(output, _(" \\dt[S+] [PATTERN] list tables\n"));
fprintf(output, _(" \\dT[S+] [PATTERN] list data types\n"));
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 7709112f494..d6fffcf42f1 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -960,11 +960,13 @@ static const pgsql_thing_t words_after_create[] = {
{"OWNED", NULL, NULL, THING_NO_CREATE}, /* for DROP OWNED BY ... */
{"PARSER", Query_for_list_of_ts_parsers, NULL, THING_NO_SHOW},
{"POLICY", NULL, NULL},
+ {"PUBLICATION", NULL, NULL},
{"ROLE", Query_for_list_of_roles},
{"RULE", "SELECT pg_catalog.quote_ident(rulename) FROM pg_catalog.pg_rules WHERE substring(pg_catalog.quote_ident(rulename),1,%d)='%s'"},
{"SCHEMA", Query_for_list_of_schemas},
{"SEQUENCE", NULL, &Query_for_list_of_sequences},
{"SERVER", Query_for_list_of_servers},
+ {"SUBSCRIPTION", NULL, NULL},
{"TABLE", NULL, &Query_for_list_of_tables},
{"TABLESPACE", Query_for_list_of_tablespaces},
{"TEMP", NULL, NULL, THING_NO_DROP}, /* for CREATE TEMP TABLE ... */
@@ -1407,8 +1409,8 @@ psql_completion(const char *text, int start, int end)
{"AGGREGATE", "COLLATION", "CONVERSION", "DATABASE", "DEFAULT PRIVILEGES", "DOMAIN",
"EVENT TRIGGER", "EXTENSION", "FOREIGN DATA WRAPPER", "FOREIGN TABLE", "FUNCTION",
"GROUP", "INDEX", "LANGUAGE", "LARGE OBJECT", "MATERIALIZED VIEW", "OPERATOR",
- "POLICY", "ROLE", "RULE", "SCHEMA", "SERVER", "SEQUENCE", "SYSTEM", "TABLE",
- "TABLESPACE", "TEXT SEARCH", "TRIGGER", "TYPE",
+ "POLICY", "PUBLICATION", "ROLE", "RULE", "SCHEMA", "SERVER", "SEQUENCE",
+ "SUBSCRIPTION", "SYSTEM", "TABLE", "TABLESPACE", "TEXT SEARCH", "TRIGGER", "TYPE",
"USER", "USER MAPPING FOR", "VIEW", NULL};
COMPLETE_WITH_LIST(list_ALTER);
@@ -1433,7 +1435,26 @@ psql_completion(const char *text, int start, int end)
else
COMPLETE_WITH_FUNCTION_ARG(prev2_wd);
}
-
+ /* ALTER PUBLICATION <name> ...*/
+ else if (Matches3("ALTER","PUBLICATION",MatchAny))
+ {
+ COMPLETE_WITH_LIST5("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", "OWNER TO");
+ }
+ /* ALTER PUBLICATION <name> .. WITH ( ... */
+ else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "("))
+ {
+ COMPLETE_WITH_LIST6("PUBLISH INSERT", "NOPUBLISH INSERT", "PUBLISH UPDATE",
+ "NOPUBLISH UPDATE", "PUBLISH DELETE", "NOPUBLISH DELETE");
+ }
+ /* ALTER SUBSCRIPTION <name> ... */
+ else if (Matches3("ALTER","SUBSCRIPTION",MatchAny))
+ {
+ COMPLETE_WITH_LIST6("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", "DISABLE", "OWNER TO");
+ }
+ else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "("))
+ {
+ COMPLETE_WITH_CONST("SLOT NAME");
+ }
/* ALTER SCHEMA <name> */
else if (Matches3("ALTER", "SCHEMA", MatchAny))
COMPLETE_WITH_LIST2("OWNER TO", "RENAME TO");
@@ -2227,6 +2248,20 @@ psql_completion(const char *text, int start, int end)
COMPLETE_WITH_CONST("(");
+/* CREATE PUBLICATION */
+ else if (Matches3("CREATE", "PUBLICATION", MatchAny))
+ COMPLETE_WITH_LIST3("FOR TABLE", "FOR ALL TABLES", "WITH (");
+ else if (Matches4("CREATE", "PUBLICATION", MatchAny, "FOR"))
+ COMPLETE_WITH_LIST2("TABLE", "ALL TABLES");
+ /* Complete "CREATE PUBLICATION <name> FOR TABLE <table>" */
+ else if (Matches4("CREATE", "PUBLICATION", MatchAny, "FOR TABLE"))
+ COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
+ /* Complete "CREATE PUBLICATION <name> [...] WITH" */
+ else if (HeadMatches2("CREATE", "PUBLICATION") && TailMatches2("WITH", "("))
+ COMPLETE_WITH_LIST2("PUBLISH", "NOPUBLISH");
+ else if (HeadMatches2("CREATE", "PUBLICATION") && TailMatches3("WITH", "(", MatchAny))
+ COMPLETE_WITH_LIST3("INSERT", "UPDATE", "DELETE");
+
/* CREATE RULE */
/* Complete "CREATE RULE <sth>" with "AS ON" */
else if (Matches3("CREATE", "RULE", MatchAny))
@@ -2278,6 +2313,16 @@ psql_completion(const char *text, int start, int end)
else if (Matches5("CREATE", "TEXT", "SEARCH", "CONFIGURATION", MatchAny))
COMPLETE_WITH_CONST("(");
+/* CREATE SUBSCRIPTION */
+ else if (Matches3("CREATE", "SUBSCRIPTION", MatchAny))
+ COMPLETE_WITH_CONST("CONNECTION");
+ else if (Matches5("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",MatchAny))
+ COMPLETE_WITH_CONST("PUBLICATION");
+ /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */
+ else if (HeadMatches2("CREATE", "SUBSCRIPTION") && TailMatches2("WITH", "("))
+ COMPLETE_WITH_LIST5("ENABLED", "DISABLED", "CREATE SLOT",
+ "NOCREATE SLOT", "SLOT NAME");
+
/* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
/* complete CREATE TRIGGER <name> with BEFORE,AFTER,INSTEAD OF */
else if (TailMatches3("CREATE", "TRIGGER", MatchAny))
@@ -2438,7 +2483,7 @@ psql_completion(const char *text, int start, int end)
/* DROP */
/* Complete DROP object with CASCADE / RESTRICT */
else if (Matches3("DROP",
- "COLLATION|CONVERSION|DOMAIN|EXTENSION|LANGUAGE|SCHEMA|SEQUENCE|SERVER|TABLE|TYPE|VIEW",
+ "COLLATION|CONVERSION|DOMAIN|EXTENSION|LANGUAGE|PUBLICATION|SCHEMA|SEQUENCE|SERVER|TABLE|TYPE|VIEW",
MatchAny) ||
Matches4("DROP", "ACCESS", "METHOD", MatchAny) ||
(Matches4("DROP", "AGGREGATE|FUNCTION", MatchAny, MatchAny) &&