wrconn = walrcv_connect(conninfo, true, stmt->subname, &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
PG_TRY();
{
wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
PG_TRY();
{
{
if (sub->enabled && !slotname)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"slot_name = NONE")));
if (!sub->slotname && enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
values[Anum_pg_subscription_subenabled - 1] =
{
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
{
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION with refresh is not allowed for disabled subscriptions"),
errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false).")));
if (!sub->enabled)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
parse_subscription_options(stmt->options,
{
/* ERROR. */
ereport(ERROR,
- (errmsg("could not drop replication slot \"%s\" on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not drop replication slot \"%s\" on publisher: %s",
slotname, res->err)));
}
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not receive list of replicated tables from the publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated tables from the publisher: %s",
res->err)));
/* Process tables. */
}
ereport(ERROR,
- (errmsg("could not connect to publisher when attempting to "
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to publisher when attempting to "
"drop replication slot \"%s\": %s", slotname, err),
/* translator: %s is an SQL ALTER command */
errhint("Use %s to disassociate the subscription from the slot.",
if (strcmp(name, pname) == 0)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("publication name \"%s\" used more than once",
pname)));
}
oldpublist = lappend(oldpublist, makeString(name));
else if (!addpub && !found)
ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("publication \"%s\" is not in subscription \"%s\"",
name, subname)));
}
if (conn_opts == NULL)
ereport(ERROR,
- (errmsg("could not parse connection string: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("could not parse connection string: %s",
_("out of memory"))));
/* build a clean connection string from pieces */
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive database system identifier and timeline ID from "
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive database system identifier and timeline ID from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
PQclear(res);
ereport(ERROR,
- (errmsg("invalid response from primary server"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid response from primary server"),
errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
ntuples, nfields, 3, 1)));
}
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
strlen(pubnames_str));
if (!pubnames_literal)
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
PQfreemem(pubnames_literal);
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not start WAL streaming: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not start WAL streaming: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
PQclear(res);
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
- (errmsg("could not send end-of-streaming message to primary: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send end-of-streaming message to primary: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
*next_tli = 0;
*/
if (PQnfields(res) < 2 || PQntuples(res) != 1)
ereport(ERROR,
- (errmsg("unexpected result set after end-of-streaming")));
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
PQclear(res);
/* End the copy */
if (PQendcopy(conn->streamConn))
ereport(ERROR,
- (errmsg("error while shutting down streaming COPY: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("error while shutting down streaming COPY: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
- (errmsg("error reading result of streaming command: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("error reading result of streaming command: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
PQclear(res);
res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL)
ereport(ERROR,
- (errmsg("unexpected result after CommandComplete: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result after CommandComplete: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive timeline history file from "
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive timeline history file from "
"the primary server: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
PQclear(res);
ereport(ERROR,
- (errmsg("invalid response from primary server"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid response from primary server"),
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
/* Try consuming some data. */
if (PQconsumeInput(conn->streamConn) == 0)
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* Now that we've consumed some input, try again */
return -1;
ereport(ERROR,
- (errmsg("unexpected result after CommandComplete: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(conn->streamConn))));
}
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
}
if (rawlen < -1)
ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not receive data from WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
/* Return received messages to caller */
if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
PQflush(conn->streamConn))
ereport(ERROR,
- (errmsg("could not send data to WAL stream: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send data to WAL stream: %s",
pchomp(PQerrorMessage(conn->streamConn)))));
}
{
PQclear(res);
ereport(ERROR,
- (errmsg("could not create replication slot \"%s\": %s",
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("could not create replication slot \"%s\": %s",
slotname, pchomp(PQerrorMessage(conn->streamConn)))));
}
/* Make sure we got expected number of fields. */
if (nfields != nRetTypes)
ereport(ERROR,
- (errmsg("invalid query response"),
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid query response"),
errdetail("Expected %d fields, got %d fields.",
nRetTypes, nfields)));
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));
slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
ereport(ERROR,
- (errmsg("table \"%s.%s\" not found on publisher",
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("table \"%s.%s\" not found on publisher",
nspname, relname)));
lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
- (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s",
nspname, relname, res->err)));
/* We don't know the number of rows coming, so allocate enough space. */
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)
ereport(ERROR,
- (errmsg("could not start initial contents copy for table \"%s.%s\": %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not start initial contents copy for table \"%s.%s\": %s",
lrel.nspname, lrel.relname, res->err)));
walrcv_clear_result(res);
walrcv_connect(MySubscription->conninfo, true, slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC ||
0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
- (errmsg("table copy could not start transaction on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("table copy could not start transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
if (res->status != WALRCV_OK_COMMAND)
ereport(ERROR,
- (errmsg("table copy could not finish transaction on publisher: %s",
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("table copy could not finish transaction on publisher: %s",
res->err)));
walrcv_clear_result(res);
if (now >= timeout)
ereport(ERROR,
- (errmsg("terminating logical replication worker due to timeout")));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating logical replication worker due to timeout")));
/* Check to see if it's time for a ping. */
if (!ping_sent)
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
ereport(ERROR,
- (errmsg("could not connect to the publisher: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the publisher: %s", err)));
/*
* We don't really use the output identify_system for anything but it
PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */
- wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
+ wrconn = walrcv_connect(conninfo, false,
+ cluster_name[0] ? cluster_name : "walreceiver",
+ &err);
if (!wrconn)
ereport(ERROR,
- (errmsg("could not connect to the primary server: %s", err)));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err)));
/*
* Save user-visible connection string. This clobbers the original
if (strcmp(primary_sysid, standby_sysid) != 0)
{
ereport(ERROR,
- (errmsg("database system identifier differs between the primary and standby"),
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("database system identifier differs between the primary and standby"),
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
*/
if (primaryTLI < startpointTLI)
ereport(ERROR,
- (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("highest timeline %u of the primary is behind recovery timeline %u",
primaryTLI, startpointTLI)));
/*
*/
if (!RecoveryInProgress())
ereport(FATAL,
- (errmsg("cannot continue WAL streaming, recovery has already ended")));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot continue WAL streaming, recovery has already ended")));
/* Process any requests or signals received recently */
ProcessWalRcvInterrupts();
if (now >= timeout)
ereport(ERROR,
- (errmsg("terminating walreceiver due to timeout")));
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating walreceiver due to timeout")));
/*
* We didn't receive anything new, for half of