}
static void
-parse_output_parameters(List *options, uint32 *protocol_version,
- List **publication_names, bool *binary,
- bool *enable_streaming)
+parse_output_parameters(List *options, PGOutputData *data)
{
ListCell *lc;
bool protocol_version_given = false;
bool binary_option_given = false;
bool streaming_given = false;
- *binary = false;
+ data->binary = false;
+ data->streaming = false;
foreach(lc, options)
{
errmsg("proto_version \"%s\" out of range",
strVal(defel->arg))));
- *protocol_version = (uint32) parsed;
+ data->protocol_version = (uint32) parsed;
}
else if (strcmp(defel->defname, "publication_names") == 0)
{
publication_names_given = true;
if (!SplitIdentifierString(strVal(defel->arg), ',',
- publication_names))
+ &data->publication_names))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax")));
errmsg("conflicting or redundant options")));
binary_option_given = true;
- *binary = defGetBoolean(defel);
+ data->binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
errmsg("conflicting or redundant options")));
streaming_given = true;
- *enable_streaming = defGetBoolean(defel);
+ data->streaming = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
- bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
if (!is_init)
{
/* Parse the params and ERROR if we see any we don't recognize */
- parse_output_parameters(ctx->output_plugin_options,
- &data->protocol_version,
- &data->publication_names,
- &data->binary,
- &enable_streaming);
+ parse_output_parameters(ctx->output_plugin_options, data);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
- if (!enable_streaming)
+ if (!data->streaming)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,