(errmsg("setting db node for query to be sent, no query context")));\
} while (0)
-static POOL_DEST send_to_where(Node *node, char *query);
+static POOL_DEST send_to_where(Node *node);
static void where_to_send_deallocate(POOL_QUERY_CONTEXT * query_context, Node *node);
static char *remove_read_write(int len, const char *contents, int *rewritten_len);
static void set_virtual_main_node(POOL_QUERY_CONTEXT *query_context);
{
POOL_DEST dest;
- dest = send_to_where(node, query);
+ dest = send_to_where(node);
dml_adaptive(node, query);
* From syntactically analysis decide the statement to be sent to the
* primary, the standby or either or both in native replication+HR/SR mode.
*/
-static POOL_DEST send_to_where(Node *node, char *query)
+static POOL_DEST send_to_where(Node *node)
{
/* From storage/lock.h */
#define ExclusiveLock 7 /* blocks ROW SHARE/SELECT...FOR UPDATE */
#define AccessExclusiveLock 8 /* ALTER TABLE, DROP TABLE, VACUUM FULL,
* and unqualified LOCK TABLE */
-
-/* From 9.5 include/nodes/node.h ("TAGS FOR STATEMENT NODES" part) */
- static NodeTag nodemap[] = {
- T_RawStmt,
- T_Query,
- T_PlannedStmt,
- T_InsertStmt,
- T_DeleteStmt,
- T_UpdateStmt,
- T_SelectStmt,
- T_AlterTableStmt,
- T_AlterTableCmd,
- T_AlterDomainStmt,
- T_SetOperationStmt,
- T_GrantStmt,
- T_GrantRoleStmt,
- T_AlterDefaultPrivilegesStmt,
- T_ClosePortalStmt,
- T_ClusterStmt,
- T_CopyStmt,
- T_CreateStmt, /* CREATE TABLE */
- T_DefineStmt, /* CREATE AGGREGATE, OPERATOR, TYPE */
- T_DropStmt, /* DROP TABLE etc. */
- T_TruncateStmt,
- T_CommentStmt,
- T_FetchStmt,
- T_IndexStmt, /* CREATE INDEX */
- T_CreateFunctionStmt,
- T_AlterFunctionStmt,
- T_DoStmt,
- T_RenameStmt, /* ALTER AGGREGATE etc. */
- T_RuleStmt, /* CREATE RULE */
- T_NotifyStmt,
- T_ListenStmt,
- T_UnlistenStmt,
- T_TransactionStmt,
- T_ViewStmt, /* CREATE VIEW */
- T_LoadStmt,
- T_CreateDomainStmt,
- T_CreatedbStmt,
- T_DropdbStmt,
- T_VacuumStmt,
- T_ExplainStmt,
- T_CreateTableAsStmt,
- T_CreateSeqStmt,
- T_AlterSeqStmt,
- T_VariableSetStmt, /* SET */
- T_VariableShowStmt,
- T_DiscardStmt,
- T_CreateTrigStmt,
- T_CreatePLangStmt,
- T_CreateRoleStmt,
- T_AlterRoleStmt,
- T_DropRoleStmt,
- T_LockStmt,
- T_ConstraintsSetStmt,
- T_ReindexStmt,
- T_CheckPointStmt,
- T_CreateSchemaStmt,
- T_AlterDatabaseStmt,
- T_AlterDatabaseSetStmt,
- T_AlterRoleSetStmt,
- T_CreateConversionStmt,
- T_CreateCastStmt,
- T_CreateOpClassStmt,
- T_CreateOpFamilyStmt,
- T_AlterOpFamilyStmt,
- T_PrepareStmt,
- T_ExecuteStmt,
- T_DeallocateStmt, /* DEALLOCATE */
- T_DeclareCursorStmt, /* DECLARE */
- T_CreateTableSpaceStmt,
- T_DropTableSpaceStmt,
- T_AlterObjectSchemaStmt,
- T_AlterOwnerStmt,
- T_DropOwnedStmt,
- T_ReassignOwnedStmt,
- T_CompositeTypeStmt, /* CREATE TYPE */
- T_CreateEnumStmt,
- T_CreateRangeStmt,
- T_AlterEnumStmt,
- T_AlterTSDictionaryStmt,
- T_AlterTSConfigurationStmt,
- T_CreateFdwStmt,
- T_AlterFdwStmt,
- T_CreateForeignServerStmt,
- T_AlterForeignServerStmt,
- T_CreateUserMappingStmt,
- T_AlterUserMappingStmt,
- T_DropUserMappingStmt,
- T_AlterTableSpaceOptionsStmt,
- T_AlterTableMoveAllStmt,
- T_SecLabelStmt,
- T_CreateForeignTableStmt,
- T_ImportForeignSchemaStmt,
- T_CreateExtensionStmt,
- T_AlterExtensionStmt,
- T_AlterExtensionContentsStmt,
- T_CreateEventTrigStmt,
- T_AlterEventTrigStmt,
- T_RefreshMatViewStmt,
- T_ReplicaIdentityStmt,
- T_AlterSystemStmt,
- T_CreatePolicyStmt,
- T_AlterPolicyStmt,
- T_CreateTransformStmt,
- T_CreateAmStmt,
- T_CreatePublicationStmt,
- T_AlterPublicationStmt,
- T_CreateSubscriptionStmt,
- T_DropSubscriptionStmt,
- T_CreateStatsStmt,
- T_AlterCollationStmt,
- };
-
- if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap) / sizeof(nodemap[0]),
- sizeof(NodeTag), compare) != NULL)
+ /*
+ * SELECT INTO SELECT FOR SHARE or UPDATE
+ */
+ if (IsA(node, SelectStmt))
{
- /*
- * SELECT INTO SELECT FOR SHARE or UPDATE
- */
- if (IsA(node, SelectStmt))
+ /* SELECT INTO or SELECT FOR SHARE or UPDATE ? */
+ if (pool_has_insertinto_or_locking_clause(node))
+ return POOL_PRIMARY;
+
+ /* non-SELECT query in WITH clause ? */
+ if (((SelectStmt *) node)->withClause)
{
- /* SELECT INTO or SELECT FOR SHARE or UPDATE ? */
- if (pool_has_insertinto_or_locking_clause(node))
- return POOL_PRIMARY;
+ List *ctes = ((SelectStmt *) node)->withClause->ctes;
+ ListCell *cte_item;
- /* non-SELECT query in WITH clause ? */
- if (((SelectStmt *) node)->withClause)
+ foreach(cte_item, ctes)
{
- List *ctes = ((SelectStmt *) node)->withClause->ctes;
- ListCell *cte_item;
-
- foreach(cte_item, ctes)
- {
- CommonTableExpr *cte = (CommonTableExpr *) lfirst(cte_item);
+ CommonTableExpr *cte = (CommonTableExpr *) lfirst(cte_item);
- if (!IsA(cte->ctequery, SelectStmt))
- return POOL_PRIMARY;
- }
+ if (!IsA(cte->ctequery, SelectStmt))
+ return POOL_PRIMARY;
}
-
- return POOL_EITHER;
}
- /*
- * COPY
- */
- else if (IsA(node, CopyStmt))
+ return POOL_EITHER;
+ }
+
+ /*
+ * COPY
+ */
+ else if (IsA(node, CopyStmt))
+ {
+ if (((CopyStmt *) node)->is_from)
+ return POOL_PRIMARY;
+ else
{
- if (((CopyStmt *) node)->is_from)
- return POOL_PRIMARY;
+ if (((CopyStmt *) node)->query == NULL)
+ return POOL_EITHER;
else
- {
- if (((CopyStmt *) node)->query == NULL)
- return POOL_EITHER;
- else
- return (IsA(((CopyStmt *) node)->query, SelectStmt)) ? POOL_EITHER : POOL_PRIMARY;
- }
+ return (IsA(((CopyStmt *) node)->query, SelectStmt)) ? POOL_EITHER : POOL_PRIMARY;
}
+ }
- /*
- * LOCK
- */
- else if (IsA(node, LockStmt))
- {
- return (((LockStmt *) node)->mode >= RowExclusiveLock) ? POOL_PRIMARY : POOL_BOTH;
- }
+ /*
+ * LOCK
+ */
+ else if (IsA(node, LockStmt))
+ {
+ return (((LockStmt *) node)->mode >= RowExclusiveLock) ? POOL_PRIMARY : POOL_BOTH;
+ }
+ /*
+ * Transaction commands
+ */
+ else if (IsA(node, TransactionStmt))
+ {
/*
- * Transaction commands
+ * Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"
*/
- else if (IsA(node, TransactionStmt))
+ if (is_start_transaction_query(node))
{
/*
- * Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"
+ * But actually, we send BEGIN to standby if it's BEGIN READ
+ * WRITE or START TRANSACTION READ WRITE
*/
- if (is_start_transaction_query(node))
- {
- /*
- * But actually, we send BEGIN to standby if it's BEGIN READ
- * WRITE or START TRANSACTION READ WRITE
- */
- if (is_read_write((TransactionStmt *) node))
- return POOL_BOTH;
-
- /*
- * Other TRANSACTION start commands are sent to both primary
- * and standby
- */
- else
- return POOL_BOTH;
- }
- /* SAVEPOINT related commands are sent to both primary and standby */
- else if (is_savepoint_query(node))
- {
- if (SL_MODE && is_tx_started_by_multi_statement_query())
- {
- /*
- * But in streaming replication mode, if a transaction was
- * started by a multi statement query, SAVEPOINT should be
- * sent to primary because the transaction was started on
- * primary only.
- */
- return POOL_PRIMARY;
- }
+ if (is_read_write((TransactionStmt *) node))
return POOL_BOTH;
- }
+
/*
- * 2PC commands
+ * Other TRANSACTION start commands are sent to both primary
+ * and standby
*/
- else if (is_2pc_transaction_query(node))
- return POOL_PRIMARY;
else
- /* COMMIT etc. */
return POOL_BOTH;
}
-
- /*
- * SET
- */
- else if (IsA(node, VariableSetStmt))
+ /* SAVEPOINT related commands are sent to both primary and standby */
+ else if (is_savepoint_query(node))
{
- ListCell *list_item;
- bool ret = POOL_BOTH;
-
- /*
- * SET transaction_read_only TO off
- */
- if (((VariableSetStmt *) node)->kind == VAR_SET_VALUE &&
- !strcmp(((VariableSetStmt *) node)->name, "transaction_read_only"))
- {
- List *options = ((VariableSetStmt *) node)->args;
-
- foreach(list_item, options)
- {
- A_Const *v = (A_Const *) lfirst(list_item);
-
- switch (nodeTag(&v->val))
- {
- case T_String:
- if (!strcasecmp(v->val.sval.sval, "off") ||
- !strcasecmp(v->val.sval.sval, "f") ||
- !strcasecmp(v->val.sval.sval, "false"))
- ret = POOL_PRIMARY;
- break;
- case T_Integer:
- if (v->val.ival.ival)
- ret = POOL_PRIMARY;
- default:
- break;
- }
- }
- return ret;
- }
-
- /*
- * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET SESSION
- * CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
- * SET transaction_isolation TO 'serializable' SET
- * default_transaction_isolation TO 'serializable'
- */
- else if (is_set_transaction_serializable(node))
- {
- return POOL_PRIMARY;
- }
-
- /*
- * Check "SET TRANSACTION READ WRITE" "SET SESSION CHARACTERISTICS
- * AS TRANSACTION READ WRITE"
- */
- else if (((VariableSetStmt *) node)->kind == VAR_SET_MULTI &&
- (!strcmp(((VariableSetStmt *) node)->name, "TRANSACTION") ||
- !strcmp(((VariableSetStmt *) node)->name, "SESSION CHARACTERISTICS")))
- {
- List *options = ((VariableSetStmt *) node)->args;
-
- foreach(list_item, options)
- {
- DefElem *opt = (DefElem *) lfirst(list_item);
-
- if (!strcmp("transaction_read_only", opt->defname))
- {
- bool read_only;
-
- read_only = ((A_Const *) opt->arg)->val.ival.ival;
- if (!read_only)
- return POOL_PRIMARY;
- }
- }
- return POOL_BOTH;
- }
- else
+ if (SL_MODE && is_tx_started_by_multi_statement_query())
{
/*
- * All other SET command sent to both primary and standby
+ * But in streaming replication mode, if a transaction was
+ * started by a multi statement query, SAVEPOINT should be
+ * sent to primary because the transaction was started on
+ * primary only.
*/
- return POOL_BOTH;
+ return POOL_PRIMARY;
}
+ return POOL_BOTH;
}
-
/*
- * DISCARD
+ * 2PC commands
*/
- else if (IsA(node, DiscardStmt))
- {
+ else if (is_2pc_transaction_query(node))
+ return POOL_PRIMARY;
+ else
+ /* COMMIT etc. */
return POOL_BOTH;
- }
+ }
+
+ /*
+ * SET
+ */
+ else if (IsA(node, VariableSetStmt))
+ {
+ ListCell *list_item;
+ bool ret = POOL_BOTH;
/*
- * PREPARE
+ * SET transaction_read_only TO off
*/
- else if (IsA(node, PrepareStmt))
+ if (((VariableSetStmt *) node)->kind == VAR_SET_VALUE &&
+ !strcmp(((VariableSetStmt *) node)->name, "transaction_read_only"))
{
- PrepareStmt *prepare_statement = (PrepareStmt *) node;
+ List *options = ((VariableSetStmt *) node)->args;
- char *string = nodeToString(prepare_statement->query);
+ foreach(list_item, options)
+ {
+ A_Const *v = (A_Const *) lfirst(list_item);
- /* Note that this is a recursive call */
- return send_to_where((Node *) (prepare_statement->query), string);
+ switch (nodeTag(&v->val))
+ {
+ case T_String:
+ if (!strcasecmp(v->val.sval.sval, "off") ||
+ !strcasecmp(v->val.sval.sval, "f") ||
+ !strcasecmp(v->val.sval.sval, "false"))
+ ret = POOL_PRIMARY;
+ break;
+ case T_Integer:
+ if (v->val.ival.ival)
+ ret = POOL_PRIMARY;
+ default:
+ break;
+ }
+ }
+ return ret;
}
/*
- * EXECUTE
+ * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET SESSION
+ * CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
+ * SET transaction_isolation TO 'serializable' SET
+ * default_transaction_isolation TO 'serializable'
*/
- else if (IsA(node, ExecuteStmt))
+ else if (is_set_transaction_serializable(node))
{
- /*
- * This is temporary decision. where_to_send will inherit same
- * destination AS PREPARE.
- */
return POOL_PRIMARY;
}
/*
- * DEALLOCATE
+ * Check "SET TRANSACTION READ WRITE" "SET SESSION CHARACTERISTICS
+ * AS TRANSACTION READ WRITE"
*/
- else if (IsA(node, DeallocateStmt))
+ else if (((VariableSetStmt *) node)->kind == VAR_SET_MULTI &&
+ (!strcmp(((VariableSetStmt *) node)->name, "TRANSACTION") ||
+ !strcmp(((VariableSetStmt *) node)->name, "SESSION CHARACTERISTICS")))
+ {
+ List *options = ((VariableSetStmt *) node)->args;
+
+ foreach(list_item, options)
+ {
+ DefElem *opt = (DefElem *) lfirst(list_item);
+
+ if (!strcmp("transaction_read_only", opt->defname))
+ {
+ bool read_only;
+
+ read_only = ((A_Const *) opt->arg)->val.ival.ival;
+ if (!read_only)
+ return POOL_PRIMARY;
+ }
+ }
+ return POOL_BOTH;
+ }
+ else
{
/*
- * This is temporary decision. where_to_send will inherit same
- * destination AS PREPARE.
+ * All other SET command sent to both primary and standby
*/
- return POOL_PRIMARY;
+ return POOL_BOTH;
}
+ }
+
+ /*
+ * DISCARD
+ */
+ else if (IsA(node, DiscardStmt))
+ {
+ return POOL_BOTH;
+ }
+
+ /*
+ * PREPARE
+ */
+ else if (IsA(node, PrepareStmt))
+ {
+ PrepareStmt *prepare_statement = (PrepareStmt *) node;
+ /* Note that this is a recursive call */
+ return send_to_where((Node *) (prepare_statement->query));
+ }
+
+ /*
+ * EXECUTE
+ */
+ else if (IsA(node, ExecuteStmt))
+ {
/*
- * SHOW
+ * This is a temporary decision. where_to_send will inherit same
+ * destination as PREPARE.
*/
- else if (IsA(node, VariableShowStmt))
- {
- return POOL_EITHER;
- }
+ return POOL_PRIMARY;
+ }
+ /*
+ * DEALLOCATE
+ */
+ else if (IsA(node, DeallocateStmt))
+ {
/*
- * Other statements are sent to primary
+ * This is temporary decision. where_to_send will inherit same
+ * destination AS PREPARE.
*/
return POOL_PRIMARY;
}
/*
- * All unknown statements are sent to primary
+ * SHOW
+ */
+ else if (IsA(node, VariableShowStmt))
+ {
+ return POOL_EITHER;
+ }
+
+ /*
+ * All other statements are sent to primary
*/
return POOL_PRIMARY;
}