diff options
| author | Peter Eisentraut | 2018-04-07 15:24:53 +0000 |
|---|---|---|
| committer | Peter Eisentraut | 2018-04-07 15:34:11 +0000 |
| commit | 039eb6e92f20499ac36cc74f8a5cef7430b706f6 (patch) | |
| tree | 2cf52aeafb59917d5c7ed396acb6d86325b4a8b0 /src/backend/replication | |
| parent | 5dfd1e5a6696b271a2cdee54143fbc209c88c02f (diff) | |
Logical replication support for TRUNCATE
Update the built-in logical replication system to make use of the
previously added logical decoding for TRUNCATE support. Add the
required truncate callback to pgoutput and a new logical replication
protocol message.
Publications get a new attribute to determine whether to replicate
truncate actions. When updating a publication via pg_dump from an older
version, this is not set, thus preserving the previous behavior.
Author: Simon Riggs <simon@2ndquadrant.com>
Author: Marco Nenciarini <marco.nenciarini@2ndquadrant.it>
Author: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/logical/proto.c | 55 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 68 | ||||
| -rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 129 |
3 files changed, 216 insertions, 36 deletions
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 948343e4aee..edc97a7662b 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -26,6 +26,9 @@ */ #define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define TRUNCATE_CASCADE (1<<0) +#define TRUNCATE_RESTART_SEQS (1<<1) + static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple); @@ -293,6 +296,58 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) } /* + * Write TRUNCATE to the output stream. + */ +void +logicalrep_write_truncate(StringInfo out, + int nrelids, + Oid relids[], + bool cascade, bool restart_seqs) +{ + int i; + uint8 flags = 0; + + pq_sendbyte(out, 'T'); /* action TRUNCATE */ + + pq_sendint32(out, nrelids); + + /* encode and send truncate flags */ + if (cascade) + flags |= TRUNCATE_CASCADE; + if (restart_seqs) + flags |= TRUNCATE_RESTART_SEQS; + pq_sendint8(out, flags); + + for (i = 0; i < nrelids; i++) + pq_sendint32(out, relids[i]); +} + +/* + * Read TRUNCATE from stream. + */ +List * +logicalrep_read_truncate(StringInfo in, + bool *cascade, bool *restart_seqs) +{ + int i; + int nrelids; + List *relids = NIL; + uint8 flags; + + nrelids = pq_getmsgint(in, 4); + + /* read and decode truncate flags */ + flags = pq_getmsgint(in, 1); + *cascade = (flags & TRUNCATE_CASCADE) > 0; + *restart_seqs = (flags & TRUNCATE_RESTART_SEQS) > 0; + + for (i = 0; i < nrelids; i++) + relids = lappend_oid(relids, pq_getmsgint(in, 4)); + + return relids; +} + +/* * Write relation description to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b10857550a6..aa7e27179e8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -30,10 +30,12 @@ #include "access/xact.h" #include "access/xlog_internal.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" +#include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" @@ -83,6 +85,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/timeout.h" #include "utils/tqual.h" #include "utils/syscache.h" @@ -888,6 +891,67 @@ apply_handle_delete(StringInfo s) CommandCounterIncrement(); } +/* + * Handle TRUNCATE message. + * + * TODO: FDW support + */ +static void +apply_handle_truncate(StringInfo s) +{ + bool cascade = false; + bool restart_seqs = false; + List *remote_relids = NIL; + List *remote_rels = NIL; + List *rels = NIL; + List *relids = NIL; + List *relids_logged = NIL; + ListCell *lc; + + ensure_transaction(); + + remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs); + + foreach(lc, remote_relids) + { + LogicalRepRelId relid = lfirst_oid(lc); + LogicalRepRelMapEntry *rel; + + rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } + + remote_rels = lappend(remote_rels, rel); + rels = lappend(rels, rel->localrel); + relids = lappend_oid(relids, rel->localreloid); + if (RelationIsLogicallyLogged(rel->localrel)) + relids_logged = lappend_oid(relids, rel->localreloid); + } + + /* + * Even if we used CASCADE on the upstream master we explicitly + * default to replaying changes without further cascading. + * This might be later changeable with a user specified option. + */ + ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); + + foreach(lc, remote_rels) + { + LogicalRepRelMapEntry *rel = lfirst(lc); + + logicalrep_rel_close(rel, NoLock); + } + + CommandCounterIncrement(); +} + /* * Logical replication protocol message dispatcher. @@ -919,6 +983,10 @@ apply_dispatch(StringInfo s) case 'D': apply_handle_delete(s); break; + /* TRUNCATE */ + case 'T': + apply_handle_truncate(s); + break; /* RELATION */ case 'R': apply_handle_relation(s); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index aa9cf5b54ed..06dfbc082f2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -39,6 +39,9 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static void pgoutput_truncate(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, Relation relations[], + ReorderBufferChange *change); static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); @@ -77,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->startup_cb = pgoutput_startup; cb->begin_cb = pgoutput_begin_txn; cb->change_cb = pgoutput_change; + cb->truncate_cb = pgoutput_truncate; cb->commit_cb = pgoutput_commit_txn; cb->filter_by_origin_cb = pgoutput_origin_filter; cb->shutdown_cb = pgoutput_shutdown; @@ -251,6 +255,46 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* + * Write the relation schema if the current schema hasn't been sent yet. + */ +static void +maybe_send_schema(LogicalDecodingContext *ctx, + Relation relation, RelationSyncEntry *relentry) +{ + if (!relentry->schema_sent) + { + TupleDesc desc; + int i; + + desc = RelationGetDescr(relation); + + /* + * Write out type info if needed. We do that only for user created + * types. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped) + continue; + + if (att->atttypid < FirstNormalObjectId) + continue; + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_typ(ctx->out, att->atttypid); + OutputPluginWrite(ctx, false); + } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); + relentry->schema_sent = true; + } +} + +/* * Sends the decoded DML over wire. */ static void @@ -288,40 +332,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); - /* - * Write the relation schema if the current schema haven't been sent yet. - */ - if (!relentry->schema_sent) - { - TupleDesc desc; - int i; - - desc = RelationGetDescr(relation); - - /* - * Write out type info if needed. We do that only for user created - * types. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); - - if (att->attisdropped) - continue; - - if (att->atttypid < FirstNormalObjectId) - continue; - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } - - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); - OutputPluginWrite(ctx, false); - relentry->schema_sent = true; - } + maybe_send_schema(ctx, relation, relentry); /* Send the data */ switch (change->action) @@ -363,6 +374,51 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, MemoryContextReset(data->context); } +static void +pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + MemoryContext old; + RelationSyncEntry *relentry; + int i; + int nrelids; + Oid *relids; + + old = MemoryContextSwitchTo(data->context); + + relids = palloc0(nrelations * sizeof(Oid)); + nrelids = 0; + + for (i = 0; i < nrelations; i++) + { + Relation relation = relations[i]; + Oid relid = RelationGetRelid(relation); + + if (!is_publishable_relation(relation)) + continue; + + relentry = get_rel_sync_entry(data, relid); + + if (!relentry->pubactions.pubtruncate) + continue; + + relids[nrelids++] = relid; + maybe_send_schema(ctx, relation, relentry); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_truncate(ctx->out, + nrelids, + relids, + change->data.truncate.cascade, + change->data.truncate.restart_seqs); + OutputPluginWrite(ctx, true); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); +} + /* * Currently we always forward. */ @@ -504,7 +560,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) * we only need to consider ones that the subscriber requested. */ entry->pubactions.pubinsert = entry->pubactions.pubupdate = - entry->pubactions.pubdelete = false; + entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; foreach(lc, data->publications) { @@ -515,10 +571,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } |
