diff options
| author | Peter Eisentraut | 2017-01-19 17:00:00 +0000 |
|---|---|---|
| committer | Peter Eisentraut | 2017-01-20 14:04:49 +0000 |
| commit | 665d1fad99e7b11678b0d5fa24d2898424243cd6 (patch) | |
| tree | eefe3eb528f840780aef6c09939a1844dbabb30a /src/bin | |
| parent | ba61a04bc7fefeee03416d9911eb825c4897c223 (diff) | |
Logical replication
- Add PUBLICATION catalogs and DDL
- Add SUBSCRIPTION catalog and DDL
- Define logical replication protocol and output plugin
- Add logical replication workers
From: Petr Jelinek <petr@2ndquadrant.com>
Reviewed-by: Steve Singer <steve@ssinger.info>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Erik Rijkers <er@xs4all.nl>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Diffstat (limited to 'src/bin')
| -rw-r--r-- | src/bin/pg_dump/common.c | 12 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_backup.h | 3 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_backup_archiver.c | 7 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_dump.c | 464 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_dump.h | 46 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_dump_sort.c | 20 | ||||
| -rw-r--r-- | src/bin/pg_dump/pg_restore.c | 3 | ||||
| -rw-r--r-- | src/bin/pg_dump/t/002_pg_dump.pl | 76 | ||||
| -rw-r--r-- | src/bin/psql/command.c | 16 | ||||
| -rw-r--r-- | src/bin/psql/describe.c | 292 | ||||
| -rw-r--r-- | src/bin/psql/describe.h | 9 | ||||
| -rw-r--r-- | src/bin/psql/help.c | 2 | ||||
| -rw-r--r-- | src/bin/psql/tab-complete.c | 53 |
13 files changed, 986 insertions, 17 deletions
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index d4e36421d29..89530a9f0fb 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -292,6 +292,18 @@ getSchemaData(Archive *fout, int *numTablesPtr) write_msg(NULL, "reading partition key information for interesting tables\n"); getTablePartitionKeyInfo(fout, tblinfo, numTables); + if (g_verbose) + write_msg(NULL, "reading publications\n"); + getPublications(fout); + + if (g_verbose) + write_msg(NULL, "reading publication membership\n"); + getPublicationTables(fout, tblinfo, numTables); + + if (g_verbose) + write_msg(NULL, "reading subscriptions\n"); + getSubscriptions(fout); + *numTablesPtr = numTables; return tblinfo; } diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 7241cdfc449..6480fb8e745 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -119,6 +119,7 @@ typedef struct _restoreOptions bool *idWanted; /* array showing which dump IDs to emit */ int enable_row_security; int sequence_data; /* dump sequence data even in schema-only mode */ + int include_subscriptions; } RestoreOptions; typedef struct _dumpOptions @@ -152,6 +153,8 @@ typedef struct _dumpOptions int outputNoTablespaces; int use_setsessauth; int enable_row_security; + int include_subscriptions; + int no_create_subscription_slots; /* default, if no "inclusion" switches appear, is to dump everything */ bool include_everything; diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index b89bd99e49f..929f1b592bc 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -172,6 +172,7 @@ dumpOptionsFromRestoreOptions(RestoreOptions *ropt) dopt->include_everything = ropt->include_everything; dopt->enable_row_security = ropt->enable_row_security; dopt->sequence_data = ropt->sequence_data; + dopt->include_subscriptions = ropt->include_subscriptions; return dopt; } @@ -3266,6 +3267,8 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH) strcmp(type, "SCHEMA") == 0 || strcmp(type, "FOREIGN DATA WRAPPER") == 0 || strcmp(type, "SERVER") == 0 || + strcmp(type, "PUBLICATION") == 0 || + strcmp(type, "SUBSCRIPTION") == 0 || strcmp(type, "USER MAPPING") == 0) { /* We already know that search_path was set properly */ @@ -3476,7 +3479,9 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass) strcmp(te->desc, "TEXT SEARCH DICTIONARY") == 0 || strcmp(te->desc, "TEXT SEARCH CONFIGURATION") == 0 || strcmp(te->desc, "FOREIGN DATA WRAPPER") == 0 || - strcmp(te->desc, "SERVER") == 0) + strcmp(te->desc, "SERVER") == 0 || + strcmp(te->desc, "PUBLICATION") == 0 || + strcmp(te->desc, "SUBSCRIPTION") == 0) { PQExpBuffer temp = createPQExpBuffer(); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 883fde1e5aa..0bb363957a4 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -246,6 +246,9 @@ static void getBlobs(Archive *fout); static void dumpBlob(Archive *fout, BlobInfo *binfo); static int dumpBlobs(Archive *fout, void *arg); static void dumpPolicy(Archive *fout, PolicyInfo *polinfo); +static void dumpPublication(Archive *fout, PublicationInfo *pubinfo); +static void dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo); +static void dumpSubscription(Archive *fout, SubscriptionInfo *subinfo); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); @@ -338,6 +341,7 @@ main(int argc, char **argv) {"enable-row-security", no_argument, &dopt.enable_row_security, 1}, {"exclude-table-data", required_argument, NULL, 4}, {"if-exists", no_argument, &dopt.if_exists, 1}, + {"include-subscriptions", no_argument, &dopt.include_subscriptions, 1}, {"inserts", no_argument, &dopt.dump_inserts, 1}, {"lock-wait-timeout", required_argument, NULL, 2}, {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1}, @@ -348,6 +352,7 @@ main(int argc, char **argv) {"snapshot", required_argument, NULL, 6}, {"strict-names", no_argument, &strict_names, 1}, {"use-set-session-authorization", no_argument, &dopt.use_setsessauth, 1}, + {"no-create-subscription-slots", no_argument, &dopt.no_create_subscription_slots, 1}, {"no-security-labels", no_argument, &dopt.no_security_labels, 1}, {"no-synchronized-snapshots", no_argument, &dopt.no_synchronized_snapshots, 1}, {"no-unlogged-table-data", no_argument, &dopt.no_unlogged_table_data, 1}, @@ -849,6 +854,7 @@ main(int argc, char **argv) ropt->include_everything = dopt.include_everything; ropt->enable_row_security = dopt.enable_row_security; ropt->sequence_data = dopt.sequence_data; + ropt->include_subscriptions = dopt.include_subscriptions; if (compressLevel == -1) ropt->compression = 0; @@ -929,7 +935,10 @@ help(const char *progname) " access to)\n")); printf(_(" --exclude-table-data=TABLE do NOT dump data for the named table(s)\n")); printf(_(" --if-exists use IF EXISTS when dropping objects\n")); + printf(_(" --include-subscriptions dump logical replication subscriptions\n")); printf(_(" --inserts dump data as INSERT commands, rather than COPY\n")); + printf(_(" --no-create-subscription-slots\n" + " do not create replication slots for subscriptions\n")); printf(_(" --no-security-labels do not dump security label assignments\n")); printf(_(" --no-synchronized-snapshots do not use synchronized snapshots in parallel jobs\n")); printf(_(" --no-tablespaces do not dump tablespace assignments\n")); @@ -3311,6 +3320,449 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo) destroyPQExpBuffer(delqry); } +/* + * getPublications + * get information about publications + */ +void +getPublications(Archive *fout) +{ + PQExpBuffer query; + PGresult *res; + PublicationInfo *pubinfo; + int i_tableoid; + int i_oid; + int i_pubname; + int i_rolname; + int i_puballtables; + int i_pubinsert; + int i_pubupdate; + int i_pubdelete; + int i, + ntups; + + if (fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + resetPQExpBuffer(query); + + /* Get the publications. */ + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete " + "FROM pg_catalog.pg_publication p", + username_subquery); + + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * There are no publications defined. Clean up and return. + */ + PQclear(res); + return; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + i_rolname = PQfnumber(res, "rolname"); + i_puballtables = PQfnumber(res, "puballtables"); + i_pubinsert = PQfnumber(res, "pubinsert"); + i_pubupdate = PQfnumber(res, "pubupdate"); + i_pubdelete = PQfnumber(res, "pubdelete"); + + pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); + + for (i = 0; i < ntups; i++) + { + pubinfo[i].dobj.objType = DO_PUBLICATION; + pubinfo[i].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + pubinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&pubinfo[i].dobj); + pubinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_pubname)); + pubinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); + pubinfo[i].puballtables = + (strcmp(PQgetvalue(res, i, i_puballtables), "t") == 0); + pubinfo[i].pubinsert = + (strcmp(PQgetvalue(res, i, i_pubinsert), "t") == 0); + pubinfo[i].pubupdate = + (strcmp(PQgetvalue(res, i, i_pubupdate), "t") == 0); + pubinfo[i].pubdelete = + (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); + + if (strlen(pubinfo[i].rolname) == 0) + write_msg(NULL, "WARNING: owner of publication \"%s\" appears to be invalid\n", + pubinfo[i].dobj.name); + } + PQclear(res); + + destroyPQExpBuffer(query); +} + +/* + * dumpPublication + * dump the definition of the given publication + */ +static void +dumpPublication(Archive *fout, PublicationInfo *pubinfo) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer delq; + PQExpBuffer query; + + if (dopt->dataOnly) + return; + + delq = createPQExpBuffer(); + query = createPQExpBuffer(); + + appendPQExpBuffer(delq, "DROP PUBLICATION %s;\n", + fmtId(pubinfo->dobj.name)); + + appendPQExpBuffer(query, "CREATE PUBLICATION %s", + fmtId(pubinfo->dobj.name)); + + if (pubinfo->puballtables) + appendPQExpBufferStr(query, " FOR ALL TABLES"); + + appendPQExpBufferStr(query, " WITH ("); + if (pubinfo->pubinsert) + appendPQExpBufferStr(query, "PUBLISH INSERT"); + else + appendPQExpBufferStr(query, "NOPUBLISH INSERT"); + + if (pubinfo->pubupdate) + appendPQExpBufferStr(query, ", PUBLISH UPDATE"); + else + appendPQExpBufferStr(query, ", NOPUBLISH UPDATE"); + + if (pubinfo->pubdelete) + appendPQExpBufferStr(query, ", PUBLISH DELETE"); + else + appendPQExpBufferStr(query, ", NOPUBLISH DELETE"); + + appendPQExpBufferStr(query, ");\n"); + + ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, + pubinfo->dobj.name, + NULL, + NULL, + pubinfo->rolname, false, + "PUBLICATION", SECTION_POST_DATA, + query->data, delq->data, NULL, + NULL, 0, + NULL, NULL); + + destroyPQExpBuffer(delq); + destroyPQExpBuffer(query); +} + +/* + * getPublicationTables + * get information about publication membership for dumpable tables. + */ +void +getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) +{ + PQExpBuffer query; + PGresult *res; + PublicationRelInfo *pubrinfo; + int i_tableoid; + int i_oid; + int i_pubname; + int i, + j, + ntups; + + if (fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + for (i = 0; i < numTables; i++) + { + TableInfo *tbinfo = &tblinfo[i]; + + /* Only plain tables can be aded to publications. */ + if (tbinfo->relkind != RELKIND_RELATION) + continue; + + /* + * Ignore publication membership of tables whose definitions are + * not to be dumped. + */ + if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + continue; + + if (g_verbose) + write_msg(NULL, "reading publication membership for table \"%s.%s\"\n", + tbinfo->dobj.namespace->dobj.name, + tbinfo->dobj.name); + + resetPQExpBuffer(query); + + /* Get the publication memebership for the table. */ + appendPQExpBuffer(query, + "SELECT pr.tableoid, pr.oid, p.pubname " + "FROM pg_catalog.pg_publication_rel pr," + " pg_catalog.pg_publication p " + "WHERE pr.prrelid = '%u'" + " AND p.oid = pr.prpubid", + tbinfo->dobj.catId.oid); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * Table is not member of any publications. Clean up and return. + */ + PQclear(res); + continue; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + + pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); + + for (j = 0; j < ntups; j++) + { + pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + pubrinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, j, i_tableoid)); + pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid)); + AssignDumpId(&pubrinfo[j].dobj); + pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace; + pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname)); + pubrinfo[j].pubtable = tbinfo; + } + PQclear(res); + } + destroyPQExpBuffer(query); +} + +/* + * dumpPublicationTable + * dump the definition of the given publication table mapping + */ +static void +dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) +{ + DumpOptions *dopt = fout->dopt; + TableInfo *tbinfo = pubrinfo->pubtable; + PQExpBuffer query; + char *tag; + + if (dopt->dataOnly) + return; + + tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE", + fmtId(pubrinfo->pubname)); + appendPQExpBuffer(query, " %s;", + fmtId(tbinfo->dobj.name)); + + /* + * There is no point in creating drop query as drop query as the drop + * is done by table drop. + */ + ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId, + tag, + tbinfo->dobj.namespace->dobj.name, + NULL, + "", false, + "PUBLICATION TABLE", SECTION_POST_DATA, + query->data, "", NULL, + NULL, 0, + NULL, NULL); + + free(tag); + destroyPQExpBuffer(query); +} + + +/* + * getSubscriptions + * get information about subscriptions + */ +void +getSubscriptions(Archive *fout) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer query; + PGresult *res; + SubscriptionInfo *subinfo; + int i_tableoid; + int i_oid; + int i_subname; + int i_rolname; + int i_subenabled; + int i_subconninfo; + int i_subslotname; + int i_subpublications; + int i, + ntups; + + if (!dopt->include_subscriptions || fout->remoteVersion < 100000) + return; + + query = createPQExpBuffer(); + + resetPQExpBuffer(query); + + /* Get the subscriptions in current database. */ + appendPQExpBuffer(query, + "SELECT s.tableoid, s.oid, s.subname," + "(%s s.subowner) AS rolname, s.subenabled, " + " s.subconninfo, s.subslotname, s.subpublications " + "FROM pg_catalog.pg_subscription s " + "WHERE s.subdbid = (SELECT oid FROM pg_catalog.pg_database" + " WHERE datname = current_database())", + username_subquery); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * There are no subscriptions defined. Clean up and return. + */ + PQclear(res); + return; + } + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_subname = PQfnumber(res, "subname"); + i_rolname = PQfnumber(res, "rolname"); + i_subenabled = PQfnumber(res, "subenabled"); + i_subconninfo = PQfnumber(res, "subconninfo"); + i_subslotname = PQfnumber(res, "subslotname"); + i_subpublications = PQfnumber(res, "subpublications"); + + subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); + + for (i = 0; i < ntups; i++) + { + subinfo[i].dobj.objType = DO_SUBSCRIPTION; + subinfo[i].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + subinfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&subinfo[i].dobj); + subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname)); + subinfo[i].rolname = pg_strdup(PQgetvalue(res, i, i_rolname)); + subinfo[i].subenabled = + (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + subinfo[i].subconninfo = pg_strdup(PQgetvalue(res, i, i_subconninfo)); + subinfo[i].subslotname = pg_strdup(PQgetvalue(res, i, i_subslotname)); + subinfo[i].subpublications = + pg_strdup(PQgetvalue(res, i, i_subpublications)); + + if (strlen(subinfo[i].rolname) == 0) + write_msg(NULL, "WARNING: owner of subscription \"%s\" appears to be invalid\n", + subinfo[i].dobj.name); + } + PQclear(res); + + destroyPQExpBuffer(query); +} + +/* + * dumpSubscription + * dump the definition of the given subscription + */ +static void +dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) +{ + DumpOptions *dopt = fout->dopt; + PQExpBuffer delq; + PQExpBuffer query; + PQExpBuffer publications; + char **pubnames = NULL; + int npubnames = 0; + int i; + + if (dopt->dataOnly) + return; + + delq = createPQExpBuffer(); + query = createPQExpBuffer(); + + appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", + fmtId(subinfo->dobj.name)); + + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + fmtId(subinfo->dobj.name)); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + + /* Build list of quoted publications and append them to query. */ + if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) + { + write_msg(NULL, + "WARNING: could not parse subpublications array\n"); + if (pubnames) + free(pubnames); + pubnames = NULL; + npubnames = 0; + } + + publications = createPQExpBuffer(); + for (i = 0; i < npubnames; i++) + { + if (i > 0) + appendPQExpBufferStr(publications, ", "); + + appendPQExpBufferStr(publications, fmtId(pubnames[i])); + } + + appendPQExpBuffer(query, " PUBLICATION %s WITH (", publications->data); + + if (subinfo->subenabled) + appendPQExpBufferStr(query, "ENABLED"); + else + appendPQExpBufferStr(query, "DISABLED"); + + appendPQExpBufferStr(query, ", SLOT NAME = "); + appendStringLiteralAH(query, subinfo->subslotname, fout); + + if (dopt->no_create_subscription_slots) + appendPQExpBufferStr(query, ", NOCREATE SLOT"); + + appendPQExpBufferStr(query, ");\n"); + + ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId, + subinfo->dobj.name, + NULL, + NULL, + subinfo->rolname, false, + "SUBSCRIPTION", SECTION_POST_DATA, + query->data, delq->data, NULL, + NULL, 0, + NULL, NULL); + + destroyPQExpBuffer(publications); + if (pubnames) + free(pubnames); + + destroyPQExpBuffer(delq); + destroyPQExpBuffer(query); +} + static void binary_upgrade_set_type_oids_by_type_oid(Archive *fout, PQExpBuffer upgrade_buffer, @@ -8752,6 +9204,15 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_POLICY: dumpPolicy(fout, (PolicyInfo *) dobj); break; + case DO_PUBLICATION: + dumpPublication(fout, (PublicationInfo *) dobj); + break; + case DO_PUBLICATION_REL: + dumpPublicationTable(fout, (PublicationRelInfo *) dobj); + break; + case DO_SUBSCRIPTION: + dumpSubscription(fout, (SubscriptionInfo *) dobj); + break; case DO_PRE_DATA_BOUNDARY: case DO_POST_DATA_BOUNDARY: /* never dumped, nothing to do */ @@ -16627,6 +17088,9 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_EVENT_TRIGGER: case DO_DEFAULT_ACL: case DO_POLICY: + case DO_PUBLICATION: + case DO_PUBLICATION_REL: + case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); break; diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 0c920a39076..77de22fcb8b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -79,7 +79,10 @@ typedef enum DO_POST_DATA_BOUNDARY, DO_EVENT_TRIGGER, DO_REFRESH_MATVIEW, - DO_POLICY + DO_POLICY, + DO_PUBLICATION, + DO_PUBLICATION_REL, + DO_SUBSCRIPTION } DumpableObjectType; /* component types of an object which can be selected for dumping */ @@ -567,6 +570,43 @@ typedef struct _policyInfo } PolicyInfo; /* + * The PublicationInfo struct is used to represent publications. + */ +typedef struct _PublicationInfo +{ + DumpableObject dobj; + char *rolname; + bool puballtables; + bool pubinsert; + bool pubupdate; + bool pubdelete; +} PublicationInfo; + +/* + * The PublicationRelInfo struct is used to represent publication table + * mapping. + */ +typedef struct _PublicationRelInfo +{ + DumpableObject dobj; + TableInfo *pubtable; + char *pubname; +} PublicationRelInfo; + +/* + * The SubscriptionInfo struct is used to represent subscription. + */ +typedef struct _SubscriptionInfo +{ + DumpableObject dobj; + char *rolname; + bool subenabled; + char *subconninfo; + char *subslotname; + char *subpublications; +} SubscriptionInfo; + +/* * We build an array of these with an entry for each object that is an * extension member according to pg_depend. */ @@ -663,5 +703,9 @@ extern void processExtensionTables(Archive *fout, ExtensionInfo extinfo[], extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers); extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables); extern void getTablePartitionKeyInfo(Archive *fout, TableInfo *tblinfo, int numTables); +extern void getPublications(Archive *fout); +extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], + int numTables); +extern void getSubscriptions(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 1db680b950c..ea643397ba8 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -71,7 +71,10 @@ static const int dbObjectTypePriority[] = 26, /* DO_POST_DATA_BOUNDARY */ 33, /* DO_EVENT_TRIGGER */ 34, /* DO_REFRESH_MATVIEW */ - 35 /* DO_POLICY */ + 35, /* DO_POLICY */ + 36, /* DO_PUBLICATION */ + 37, /* DO_PUBLICATION_REL */ + 38 /* DO_SUBSCRIPTION */ }; static DumpId preDataBoundId; @@ -1397,6 +1400,21 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "POLICY (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION: + snprintf(buf, bufsize, + "PUBLICATION (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; + case DO_PUBLICATION_REL: + snprintf(buf, bufsize, + "PUBLICATION TABLE (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; + case DO_SUBSCRIPTION: + snprintf(buf, bufsize, + "SUBSCRIPTION (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_PRE_DATA_BOUNDARY: snprintf(buf, bufsize, "PRE-DATA BOUNDARY (ID %d)", diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 239b0d8ac03..497677494bc 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -72,6 +72,7 @@ main(int argc, char **argv) char *inputFileSpec; static int disable_triggers = 0; static int enable_row_security = 0; + static int include_subscriptions = 0; static int if_exists = 0; static int no_data_for_failed_tables = 0; static int outputNoTablespaces = 0; @@ -116,6 +117,7 @@ main(int argc, char **argv) {"disable-triggers", no_argument, &disable_triggers, 1}, {"enable-row-security", no_argument, &enable_row_security, 1}, {"if-exists", no_argument, &if_exists, 1}, + {"include-subscriptions", no_argument, &include_subscriptions, 1}, {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1}, {"no-tablespaces", no_argument, &outputNoTablespaces, 1}, {"role", required_argument, NULL, 2}, @@ -356,6 +358,7 @@ main(int argc, char **argv) opts->disable_triggers = disable_triggers; opts->enable_row_security = enable_row_security; + opts->include_subscriptions = include_subscriptions; opts->noDataForFailedTables = no_data_for_failed_tables; opts->noTablespace = outputNoTablespaces; opts->use_setsessauth = use_setsessauth; diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index b732627c3a2..488eec30f57 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -301,7 +301,7 @@ my %tests = ( 'ALTER FUNCTION dump_test.pltestlang_call_handler() OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ \QALTER FUNCTION dump_test.pltestlang_call_handler() \E \QOWNER TO \E @@ -358,7 +358,7 @@ my %tests = ( 'ALTER PROCEDURAL LANGUAGE pltestlang OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER PROCEDURAL LANGUAGE pltestlang OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -382,7 +382,7 @@ my %tests = ( 'ALTER SCHEMA dump_test OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER SCHEMA dump_test OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -406,7 +406,7 @@ my %tests = ( 'ALTER SCHEMA dump_test_second_schema OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER SCHEMA dump_test_second_schema OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -524,7 +524,7 @@ my %tests = ( 'ALTER TABLE test_table OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER TABLE test_table OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -577,7 +577,7 @@ my %tests = ( 'ALTER TABLE test_second_table OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER TABLE test_second_table OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -601,7 +601,7 @@ my %tests = ( 'ALTER TABLE test_third_table OWNER TO' => { all_runs => 1, - catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs)', + catch_all => 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)', regexp => qr/^ALTER TABLE test_third_table OWNER TO .*;/m, like => { binary_upgrade => 1, @@ -623,10 +623,10 @@ my %tests = ( only_dump_test_table => 1, test_schema_plus_blobs => 1, }, }, - # catch-all for ALTER ... OWNER (except LARGE OBJECTs) - 'ALTER ... OWNER commands (except LARGE OBJECTs)' => { + # catch-all for ALTER ... OWNER (except LARGE OBJECTs and PUBLICATIONs) + 'ALTER ... OWNER commands (except LARGE OBJECTs and PUBLICATIONs)' => { all_runs => 0, # catch-all - regexp => qr/^ALTER (?!LARGE OBJECT)(.*) OWNER TO .*;/m, + regexp => qr/^ALTER (?!LARGE OBJECT|PUBLICATION)(.*) OWNER TO .*;/m, like => {}, # use more-specific options above unlike => { column_inserts => 1, @@ -2217,6 +2217,62 @@ my %tests = ( pg_dumpall_globals_clean => 1, role => 1, section_pre_data => 1, }, }, + + 'CREATE PUBLICATION pub1' => { + create_order => 50, + create_sql => 'CREATE PUBLICATION pub1;', + regexp => qr/^ + \QCREATE PUBLICATION pub1 WITH (PUBLISH INSERT, PUBLISH UPDATE, PUBLISH DELETE);\E + /xm, + like => { + binary_upgrade => 1, + clean => 1, + clean_if_exists => 1, + createdb => 1, + defaults => 1, + exclude_test_table_data => 1, + exclude_dump_test_schema => 1, + exclude_test_table => 1, + no_privs => 1, + no_owner => 1, + only_dump_test_schema => 1, + only_dump_test_table => 1, + pg_dumpall_dbprivs => 1, + schema_only => 1, + section_post_data => 1, + test_schema_plus_blobs => 1, }, + unlike => { + section_pre_data => 1, + pg_dumpall_globals => 1, + pg_dumpall_globals_clean => 1, }, }, + 'ALTER PUBLICATION pub1 ADD TABLE test_table' => { + create_order => 51, + create_sql => 'ALTER PUBLICATION pub1 ADD TABLE dump_test.test_table;', + regexp => qr/^ + \QALTER PUBLICATION pub1 ADD TABLE test_table;\E + /xm, + like => { + binary_upgrade => 1, + clean => 1, + clean_if_exists => 1, + createdb => 1, + defaults => 1, + exclude_test_table_data => 1, + no_privs => 1, + no_owner => 1, + only_dump_test_schema => 1, + only_dump_test_table => 1, + pg_dumpall_dbprivs => 1, + schema_only => 1, + section_post_data => 1, + test_schema_plus_blobs => 1, }, + unlike => { + section_pre_data => 1, + exclude_dump_test_schema => 1, + exclude_test_table => 1, + pg_dumpall_globals => 1, + pg_dumpall_globals_clean => 1, }, }, + 'CREATE SCHEMA dump_test' => { all_runs => 1, catch_all => 'CREATE ... commands', diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c index 4139b7763fb..0c164a339c1 100644 --- a/src/bin/psql/command.c +++ b/src/bin/psql/command.c @@ -501,6 +501,22 @@ exec_command(const char *cmd, else success = PSQL_CMD_UNKNOWN; break; + case 'R': + switch (cmd[2]) + { + case 'p': + if (show_verbose) + success = describePublications(pattern); + else + success = listPublications(pattern); + break; + case 's': + success = describeSubscriptions(pattern, show_verbose); + break; + default: + status = PSQL_CMD_UNKNOWN; + } + break; case 'u': success = describeRoles(pattern, show_verbose, show_system); break; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index ce198779f49..c501168d8c7 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2387,6 +2387,38 @@ describeOneTableDetails(const char *schemaname, } PQclear(result); } + + /* print any publications */ + if (pset.sversion >= 100000) + { + printfPQExpBuffer(&buf, + "SELECT pub.pubname\n" + " FROM pg_catalog.pg_publication pub\n" + " LEFT JOIN pg_publication_rel pr\n" + " ON (pr.prpubid = pub.oid)\n" + "WHERE pr.prrelid = '%s' OR pub.puballtables\n" + "ORDER BY 1;", + oid); + + result = PSQLexec(buf.data); + if (!result) + goto error_return; + else + tuples = PQntuples(result); + + if (tuples > 0) + printTableAddFooter(&cont, _("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(result); + } } if (view_def) @@ -4846,6 +4878,266 @@ listOneExtensionContents(const char *extname, const char *oid) return true; } +/* \dRp + * Lists publications. + * + * Takes an optional regexp to select particular publications + */ +bool +listPublications(const char *pattern) +{ + PQExpBufferData buf; + PGresult *res; + printQueryOpt myopt = pset.popt; + static const bool translate_columns[] = {false, false, false, false, false}; + + if (pset.sversion < 100000) + { + char sverbuf[32]; + psql_error("The server (version %s) does not support publications.\n", + formatPGVersionNumber(pset.sversion, false, + sverbuf, sizeof(sverbuf))); + return true; + } + + initPQExpBuffer(&buf); + + printfPQExpBuffer(&buf, + "SELECT pubname AS \"%s\",\n" + " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n" + " pubinsert AS \"%s\",\n" + " pubupdate AS \"%s\",\n" + " pubdelete AS \"%s\"\n", + gettext_noop("Name"), + gettext_noop("Owner"), + gettext_noop("Inserts"), + gettext_noop("Updates"), + gettext_noop("Deletes")); + + appendPQExpBufferStr(&buf, + "\nFROM pg_catalog.pg_publication\n"); + + processSQLNamePattern(pset.db, &buf, pattern, false, false, + NULL, "pubname", NULL, + NULL); + + appendPQExpBufferStr(&buf, "ORDER BY 1;"); + + res = PSQLexec(buf.data); + termPQExpBuffer(&buf); + if (!res) + return false; + + myopt.nullPrint = NULL; + myopt.title = _("List of publications"); + myopt.translate_header = true; + myopt.translate_columns = translate_columns; + myopt.n_translate_columns = lengthof(translate_columns); + + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); + + PQclear(res); + + return true; +} + +/* \dRp+ + * Describes publications including the contents. + * + * Takes an optional regexp to select particular publications + */ +bool +describePublications(const char *pattern) +{ + PQExpBufferData buf; + int i; + PGresult *res; + + if (pset.sversion < 100000) + { + char sverbuf[32]; + psql_error("The server (version %s) does not support publications.\n", + formatPGVersionNumber(pset.sversion, false, + sverbuf, sizeof(sverbuf))); + return true; + } + + initPQExpBuffer(&buf); + + printfPQExpBuffer(&buf, + "SELECT oid, pubname, puballtables, pubinsert,\n" + " pubupdate, pubdelete\n" + "FROM pg_catalog.pg_publication\n"); + + processSQLNamePattern(pset.db, &buf, pattern, false, false, + NULL, "pubname", NULL, + NULL); + + appendPQExpBufferStr(&buf, "ORDER BY 2;"); + + res = PSQLexec(buf.data); + if (!res) + { + termPQExpBuffer(&buf); + return false; + } + + for (i = 0; i < PQntuples(res); i++) + { + const char align = 'l'; + int ncols = 3; + int nrows = 1; + int tables = 0; + PGresult *tabres; + char *pubid = PQgetvalue(res, i, 0); + char *pubname = PQgetvalue(res, i, 1); + bool puballtables = strcmp(PQgetvalue(res, i, 2), "t") == 0; + int j; + PQExpBufferData title; + printTableOpt myopt = pset.popt.topt; + printTableContent cont; + + initPQExpBuffer(&title); + printfPQExpBuffer(&title, _("Publication %s"), pubname); + printTableInit(&cont, &myopt, title.data, ncols, nrows); + + printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); + printTableAddHeader(&cont, gettext_noop("Updates"), true, align); + printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); + + printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); + printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); + printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); + + if (puballtables) + printfPQExpBuffer(&buf, + "SELECT n.nspname, c.relname\n" + "FROM pg_catalog.pg_class c,\n" + " pg_catalog.pg_namespace n\n" + "WHERE c.relnamespace = n.oid\n" + " AND c.relkind = 'r'\n" + " AND n.nspname <> 'pg_catalog'\n" + " AND n.nspname <> 'information_schema'\n" + "ORDER BY 1,2"); + else + printfPQExpBuffer(&buf, + "SELECT n.nspname, c.relname\n" + "FROM pg_catalog.pg_class c,\n" + " pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_rel pr\n" + "WHERE c.relnamespace = n.oid\n" + " AND c.oid = pr.prrelid\n" + " AND pr.prpubid = '%s'\n" + "ORDER BY 1,2", pubid); + + tabres = PSQLexec(buf.data); + if (!tabres) + { + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; + } + else + tables = PQntuples(tabres); + + if (tables > 0) + printTableAddFooter(&cont, _("Tables:")); + + for (j = 0; j < tables; j++) + { + printfPQExpBuffer(&buf, " \"%s.%s\"", + PQgetvalue(tabres, j, 0), + PQgetvalue(tabres, j, 1)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(tabres); + + printTable(&cont, pset.queryFout, false, pset.logfile); + printTableCleanup(&cont); + + termPQExpBuffer(&title); + } + + termPQExpBuffer(&buf); + PQclear(res); + + return true; +} + +/* \dRs + * Describes subscriptions. + * + * Takes an optional regexp to select particular subscriptions + */ +bool +describeSubscriptions(const char *pattern, bool verbose) +{ + PQExpBufferData buf; + PGresult *res; + printQueryOpt myopt = pset.popt; + static const bool translate_columns[] = {false, false, false, false, false}; + + if (pset.sversion < 100000) + { + char sverbuf[32]; + psql_error("The server (version %s) does not support subscriptions.\n", + formatPGVersionNumber(pset.sversion, false, + sverbuf, sizeof(sverbuf))); + return true; + } + + initPQExpBuffer(&buf); + + printfPQExpBuffer(&buf, + "SELECT subname AS \"%s\"\n" + ", pg_catalog.pg_get_userbyid(subowner) AS \"%s\"\n" + ", subenabled AS \"%s\"\n" + ", subpublications AS \"%s\"\n", + gettext_noop("Name"), + gettext_noop("Owner"), + gettext_noop("Enabled"), + gettext_noop("Publication")); + + if (verbose) + { + appendPQExpBuffer(&buf, + ", subconninfo AS \"%s\"\n", + gettext_noop("Conninfo")); + } + + /* Only display subscritpions in current database. */ + appendPQExpBufferStr(&buf, + "FROM pg_catalog.pg_subscription\n" + "WHERE subdbid = (SELECT oid\n" + " FROM pg_catalog.pg_database\n" + " WHERE datname = current_database())"); + + processSQLNamePattern(pset.db, &buf, pattern, true, false, + NULL, "subname", NULL, + NULL); + + appendPQExpBufferStr(&buf, "ORDER BY 1;"); + + res = PSQLexec(buf.data); + termPQExpBuffer(&buf); + if (!res) + return false; + + myopt.nullPrint = NULL; + myopt.title = _("List of subscriptions"); + myopt.translate_header = true; + myopt.translate_columns = translate_columns; + myopt.n_translate_columns = lengthof(translate_columns); + + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); + + PQclear(res); + return true; +} + /* * printACLColumn * diff --git a/src/bin/psql/describe.h b/src/bin/psql/describe.h index 4600182e4c9..074553e1334 100644 --- a/src/bin/psql/describe.h +++ b/src/bin/psql/describe.h @@ -102,4 +102,13 @@ extern bool listExtensionContents(const char *pattern); /* \dy */ extern bool listEventTriggers(const char *pattern, bool verbose); +/* \dRp */ +bool listPublications(const char *pattern); + +/* \dRp+ */ +bool describePublications(const char *pattern); + +/* \dRs */ +bool describeSubscriptions(const char *pattern, bool verbose); + #endif /* DESCRIBE_H */ diff --git a/src/bin/psql/help.c b/src/bin/psql/help.c index 09baf871dda..53656294da4 100644 --- a/src/bin/psql/help.c +++ b/src/bin/psql/help.c @@ -241,6 +241,8 @@ slashUsage(unsigned short int pager) fprintf(output, _(" \\dO[S+] [PATTERN] list collations\n")); fprintf(output, _(" \\dp [PATTERN] list table, view, and sequence access privileges\n")); fprintf(output, _(" \\drds [PATRN1 [PATRN2]] list per-database role settings\n")); + fprintf(output, _(" \\dRp[+] [PATTERN] list replication publications\n")); + fprintf(output, _(" \\dRs[+] [PATTERN] list replication subscriptions\n")); fprintf(output, _(" \\ds[S+] [PATTERN] list sequences\n")); fprintf(output, _(" \\dt[S+] [PATTERN] list tables\n")); fprintf(output, _(" \\dT[S+] [PATTERN] list data types\n")); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 7709112f494..d6fffcf42f1 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -960,11 +960,13 @@ static const pgsql_thing_t words_after_create[] = { {"OWNED", NULL, NULL, THING_NO_CREATE}, /* for DROP OWNED BY ... */ {"PARSER", Query_for_list_of_ts_parsers, NULL, THING_NO_SHOW}, {"POLICY", NULL, NULL}, + {"PUBLICATION", NULL, NULL}, {"ROLE", Query_for_list_of_roles}, {"RULE", "SELECT pg_catalog.quote_ident(rulename) FROM pg_catalog.pg_rules WHERE substring(pg_catalog.quote_ident(rulename),1,%d)='%s'"}, {"SCHEMA", Query_for_list_of_schemas}, {"SEQUENCE", NULL, &Query_for_list_of_sequences}, {"SERVER", Query_for_list_of_servers}, + {"SUBSCRIPTION", NULL, NULL}, {"TABLE", NULL, &Query_for_list_of_tables}, {"TABLESPACE", Query_for_list_of_tablespaces}, {"TEMP", NULL, NULL, THING_NO_DROP}, /* for CREATE TEMP TABLE ... */ @@ -1407,8 +1409,8 @@ psql_completion(const char *text, int start, int end) {"AGGREGATE", "COLLATION", "CONVERSION", "DATABASE", "DEFAULT PRIVILEGES", "DOMAIN", "EVENT TRIGGER", "EXTENSION", "FOREIGN DATA WRAPPER", "FOREIGN TABLE", "FUNCTION", "GROUP", "INDEX", "LANGUAGE", "LARGE OBJECT", "MATERIALIZED VIEW", "OPERATOR", - "POLICY", "ROLE", "RULE", "SCHEMA", "SERVER", "SEQUENCE", "SYSTEM", "TABLE", - "TABLESPACE", "TEXT SEARCH", "TRIGGER", "TYPE", + "POLICY", "PUBLICATION", "ROLE", "RULE", "SCHEMA", "SERVER", "SEQUENCE", + "SUBSCRIPTION", "SYSTEM", "TABLE", "TABLESPACE", "TEXT SEARCH", "TRIGGER", "TYPE", "USER", "USER MAPPING FOR", "VIEW", NULL}; COMPLETE_WITH_LIST(list_ALTER); @@ -1433,7 +1435,26 @@ psql_completion(const char *text, int start, int end) else COMPLETE_WITH_FUNCTION_ARG(prev2_wd); } - + /* ALTER PUBLICATION <name> ...*/ + else if (Matches3("ALTER","PUBLICATION",MatchAny)) + { + COMPLETE_WITH_LIST5("WITH", "ADD TABLE", "SET TABLE", "DROP TABLE", "OWNER TO"); + } + /* ALTER PUBLICATION <name> .. WITH ( ... */ + else if (HeadMatches3("ALTER", "PUBLICATION",MatchAny) && TailMatches2("WITH", "(")) + { + COMPLETE_WITH_LIST6("PUBLISH INSERT", "NOPUBLISH INSERT", "PUBLISH UPDATE", + "NOPUBLISH UPDATE", "PUBLISH DELETE", "NOPUBLISH DELETE"); + } + /* ALTER SUBSCRIPTION <name> ... */ + else if (Matches3("ALTER","SUBSCRIPTION",MatchAny)) + { + COMPLETE_WITH_LIST6("WITH", "CONNECTION", "SET PUBLICATION", "ENABLE", "DISABLE", "OWNER TO"); + } + else if (HeadMatches3("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches2("WITH", "(")) + { + COMPLETE_WITH_CONST("SLOT NAME"); + } /* ALTER SCHEMA <name> */ else if (Matches3("ALTER", "SCHEMA", MatchAny)) COMPLETE_WITH_LIST2("OWNER TO", "RENAME TO"); @@ -2227,6 +2248,20 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH_CONST("("); +/* CREATE PUBLICATION */ + else if (Matches3("CREATE", "PUBLICATION", MatchAny)) + COMPLETE_WITH_LIST3("FOR TABLE", "FOR ALL TABLES", "WITH ("); + else if (Matches4("CREATE", "PUBLICATION", MatchAny, "FOR")) + COMPLETE_WITH_LIST2("TABLE", "ALL TABLES"); + /* Complete "CREATE PUBLICATION <name> FOR TABLE <table>" */ + else if (Matches4("CREATE", "PUBLICATION", MatchAny, "FOR TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); + /* Complete "CREATE PUBLICATION <name> [...] WITH" */ + else if (HeadMatches2("CREATE", "PUBLICATION") && TailMatches2("WITH", "(")) + COMPLETE_WITH_LIST2("PUBLISH", "NOPUBLISH"); + else if (HeadMatches2("CREATE", "PUBLICATION") && TailMatches3("WITH", "(", MatchAny)) + COMPLETE_WITH_LIST3("INSERT", "UPDATE", "DELETE"); + /* CREATE RULE */ /* Complete "CREATE RULE <sth>" with "AS ON" */ else if (Matches3("CREATE", "RULE", MatchAny)) @@ -2278,6 +2313,16 @@ psql_completion(const char *text, int start, int end) else if (Matches5("CREATE", "TEXT", "SEARCH", "CONFIGURATION", MatchAny)) COMPLETE_WITH_CONST("("); +/* CREATE SUBSCRIPTION */ + else if (Matches3("CREATE", "SUBSCRIPTION", MatchAny)) + COMPLETE_WITH_CONST("CONNECTION"); + else if (Matches5("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",MatchAny)) + COMPLETE_WITH_CONST("PUBLICATION"); + /* Complete "CREATE SUBSCRIPTION <name> ... WITH ( <opt>" */ + else if (HeadMatches2("CREATE", "SUBSCRIPTION") && TailMatches2("WITH", "(")) + COMPLETE_WITH_LIST5("ENABLED", "DISABLED", "CREATE SLOT", + "NOCREATE SLOT", "SLOT NAME"); + /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ /* complete CREATE TRIGGER <name> with BEFORE,AFTER,INSTEAD OF */ else if (TailMatches3("CREATE", "TRIGGER", MatchAny)) @@ -2438,7 +2483,7 @@ psql_completion(const char *text, int start, int end) /* DROP */ /* Complete DROP object with CASCADE / RESTRICT */ else if (Matches3("DROP", - "COLLATION|CONVERSION|DOMAIN|EXTENSION|LANGUAGE|SCHEMA|SEQUENCE|SERVER|TABLE|TYPE|VIEW", + "COLLATION|CONVERSION|DOMAIN|EXTENSION|LANGUAGE|PUBLICATION|SCHEMA|SEQUENCE|SERVER|TABLE|TYPE|VIEW", MatchAny) || Matches4("DROP", "ACCESS", "METHOD", MatchAny) || (Matches4("DROP", "AGGREGATE|FUNCTION", MatchAny, MatchAny) && |
