summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael P2011-01-28 01:21:14 +0000
committerPavan Deolasee2011-05-19 17:49:35 +0000
commite0617d4a90e08a0774a480a2a650b500f2ff8ceb (patch)
tree98965d39a9e8d4644540f6c3e0c80649f0011e96
parent8dfc3fa3d39ae4bd8a068aca3334ed3d47005cf1 (diff)
Fix for replicated tables using multi-INSERT queries.
Comments have also been added on functions created for this support. A structure used to combine results has been moved from pquery.c to ExecRemote.c to limit dependencies. Patch by Benny Wang, with some editorialization from me
-rw-r--r--src/backend/pgxc/plan/planner.c38
-rw-r--r--src/backend/pgxc/pool/execRemote.c13
-rw-r--r--src/backend/rewrite/rewriteHandler.c100
-rw-r--r--src/backend/tcop/pquery.c2
-rw-r--r--src/include/pgxc/execRemote.h8
-rw-r--r--src/include/tcop/pquery.h7
6 files changed, 117 insertions, 51 deletions
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 1cbf587f88..ff6ce81fb0 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -488,23 +488,19 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
/* Bad relation type */
return;
-
/* Get result relation info */
rel_loc_info = GetRelationLocInfo(rte->relid);
if (!rel_loc_info)
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Could not find relation for oid = %d", rte->relid))));
+ (errmsg("Could not find relation for oid = %d", rte->relid))));
- if (query->jointree != NULL && query->jointree->fromlist != NULL)
+ /* Optimization is only done for distributed tables */
+ if (query->jointree != NULL
+ && query->jointree->fromlist != NULL
+ && rel_loc_info->locatorType == LOCATOR_TYPE_HASH)
{
- /* INSERT SELECT suspected */
-
- /* We only optimize for when the destination is partitioned */
- if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH)
- return;
-
/*
* See if it is "single-step"
* Optimize for just known common case with 2 RTE entries
@@ -546,7 +542,7 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH &&
rel_loc_info->partAttrName != NULL)
{
- Expr *checkexpr;
+ Expr *checkexpr;
TargetEntry *tle = NULL;
/* It is a partitioned table, get value by looking in targetList */
@@ -602,7 +598,7 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step)
if (!source_rel_loc_info)
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Could not find relation for oid = %d", rte->relid))));
+ (errmsg("Could not find relation for oid = %d", rte->relid))));
if (source_rel_loc_info->locatorType == LOCATOR_TYPE_HASH &&
strcmp(col_base->colname, source_rel_loc_info->partAttrName) == 0)
@@ -3247,16 +3243,26 @@ is_pgxc_safe_func(Oid funcid)
return ret_val;
}
-/* code is borrowed from get_plan_nodes_insert */
+/*
+ * GetHashExecNodes -
+ * Get hash key of execution nodes according to the expression value
+ *
+ * Input parameters:
+ * rel_loc_info is a locator function. It contains distribution information.
+ * exec_nodes is the list of nodes to be executed
+ * expr is the partition column value
+ *
+ * code is borrowed from get_plan_nodes_insert
+ */
void
GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Expr *expr)
{
/* We may have a cast, try and handle it */
Expr *checkexpr;
Expr *eval_expr = NULL;
- Const *constant;
- long part_value;
- long *part_value_ptr = NULL;
+ Const *constant;
+ long part_value;
+ long *part_value_ptr = NULL;
eval_expr = (Expr *) eval_const_expressions(NULL, (Node *)expr);
checkexpr = get_numeric_constant(eval_expr);
@@ -3276,7 +3282,7 @@ GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Ex
/* single call handles both replicated and partitioned types */
*exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr,
- RELATION_ACCESS_INSERT);
+ RELATION_ACCESS_INSERT);
if (eval_expr)
pfree(eval_expr);
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 7621c153be..5cd60b7e62 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -815,9 +815,18 @@ HandleError(RemoteQueryState *combiner, char *msg_body, size_t len)
combiner->command_complete_count++;
}
-/* combine deparsed sql statements execution results */
+/*
+ * HandleCmdComplete -
+ * combine deparsed sql statements execution results
+ *
+ * Input parameters:
+ * commandType is dml command type
+ * combineTag is used to combine the completion result
+ * msg_body is execution result needed to combine
+ * len is msg_body size
+ */
void
-HandleCmdComplete(CmdType commandType, combineTag *combine,
+HandleCmdComplete(CmdType commandType, CombineTag *combine,
const char *msg_body, size_t len)
{
int digits = 0;
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index df16c1e134..83b45afddf 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -2181,12 +2181,18 @@ QueryRewrite(Query *parsetree)
}
#ifdef PGXC
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * GetRelPartColPos -
+ * Get the partition column position in targetList
+ */
static int
GetRelPartColPos(const Query *query, const char *partColName)
{
ListCell *lc;
int rescol = -1;
-
+
foreach(lc, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
@@ -2212,18 +2218,40 @@ GetRelPartColPos(const Query *query, const char *partColName)
return rescol;
}
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * ProcessHashValue -
+ * associates the inserted row to the specified datanode
+ *
+ * Input parameters:
+ * subList is the inserted row
+ * node is the node number
+ */
static void
ProcessHashValue(List **valuesList, const List *subList, const int node)
{
valuesList[node - 1] = lappend(valuesList[node - 1], subList);
}
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * InitValuesList -
+ * Allocate and initialize the list of values
+ */
static void
InitValuesList(List **valuesList[], int size)
{
*valuesList = palloc0(size * sizeof(List *));
}
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * InitValuesList -
+ * free all the list of values
+ */
static void
DestroyValuesList(List **valuesList[])
{
@@ -2231,9 +2259,20 @@ DestroyValuesList(List **valuesList[])
*valuesList = NIL;
}
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * ProcessRobinValue -
+ * assign insert values list to each node averagely
+ *
+ * Input parameters:
+ * valuesList is an array of lists used to assign value list to specified nodes
+ * size is number of assigned nodes
+ * values_rte is the values list
+ */
static void
ProcessRobinValue(Oid relid, List **valuesList,
- int size, const RangeTblEntry *values_rte)
+ int size, const RangeTblEntry *values_rte)
{
List *values = values_rte->values_lists;
int length = values->length;
@@ -2242,9 +2281,9 @@ ProcessRobinValue(Oid relid, List **valuesList,
int processNum = 0;
int node;
- /* get average insert value number of each node */
- if (length > NumDataNodes)
- dist = length/NumDataNodes;
+ /* Get average insert value number of each node */
+ if (length > size)
+ dist = length/size;
else
dist = 1;
@@ -2252,7 +2291,7 @@ ProcessRobinValue(Oid relid, List **valuesList,
{
node = GetRoundRobinNode(relid);
- /* assign insert value */
+ /* Assign insert value */
for(j = 0; j < dist; j++)
{
processNum += 1;
@@ -2260,17 +2299,30 @@ ProcessRobinValue(Oid relid, List **valuesList,
list_nth(values, processNum - 1));
}
}
-
- /* assign remained value */
+
+ /* Assign remained value */
while(processNum < length)
{
processNum += 1;
node = GetRoundRobinNode(relid);
valuesList[node - 1] = lappend(valuesList[node - 1],
- list_nth(values, processNum - 1));
+ list_nth(values, processNum - 1));
}
}
+/*
+ * Part of handling INSERT queries with multiple values
+ *
+ * RewriteInsertStmt -
+ * Rewrite INSERT statement.
+ * Split the INSERT statement with mutiple values into mutiple insert statements
+ * according to its distribution key. Distribution rule is as follows:
+ * 1.LOCATOR_TYPE_HASH: associates correct node with its distribution key
+ * 2.LOCATOR_TYPE_RROBIN: assign value lists to each datanodes averagely
+ * 3.DEFAULT: no need to process (replicate case)
+ *
+ * values_rte is the values list range table.
+ */
static List *
RewriteInsertStmt(Query *query, RangeTblEntry *values_rte)
{
@@ -2285,7 +2337,7 @@ RewriteInsertStmt(Query *query, RangeTblEntry *values_rte)
char *partColName;
List **valuesList;
int i;
-
+
rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1);
rte_loc_info = GetRelationLocInfo(rte->relid);
locatorType = rte_loc_info->locatorType;
@@ -2309,44 +2361,45 @@ RewriteInsertStmt(Query *query, RangeTblEntry *values_rte)
foreach(values_lc, values_rte->values_lists)
{
List *sublist = (List *)lfirst(values_lc);
-
+
if (first)
{
+ /* Get the partition column number in the targetList */
partColno = GetRelPartColPos(query, partColName);
first = false;
}
- /* get the exec node according to partition column value */
+ /* Get the exec node according to partition column value */
GetHashExecNodes(rte_loc_info, &exec_nodes,
(Expr *)list_nth(sublist, partColno));
Assert(exec_nodes->nodelist->length == 1);
- /* assign valueList to specified exec node */
+ /* Assign valueList to specified execution node */
ProcessHashValue(valuesList, sublist, list_nth_int(exec_nodes->nodelist, 0));
}
}
-
+
goto collect;
case LOCATOR_TYPE_RROBIN:
-
+
InitValuesList(&valuesList, NumDataNodes);
- /* assign valueList to specified exec node */
+ /* Assign valueList to specified execution node */
ProcessRobinValue(rte->relid, valuesList, NumDataNodes, values_rte);
-collect:
- /* produce query for relative datanodes */
- for(i = 0; i < NumDataNodes; i++)
+collect:
+ /* Produce query for relative Datanodes */
+ for (i = 0; i < NumDataNodes; i++)
{
if (valuesList[i] != NIL)
{
ExecNodes *execNodes = makeNode(ExecNodes);
execNodes->baselocatortype = rte_loc_info->locatorType;
execNodes->nodelist = lappend_int(execNodes->nodelist, i + 1);
-
+
element = copyObject(query);
-
+
rte = (RangeTblEntry *)list_nth(element->rtable, rtr->rtindex - 1);
rte->values_lists = valuesList[i];
@@ -2360,16 +2413,15 @@ collect:
rwInsertList = lappend(rwInsertList, element);
}
}
-
+
DestroyValuesList(&valuesList);
break;
-
+
default: /* distribute by replication: just do it as usual */
rwInsertList = lappend(rwInsertList, query);
break;
}
-
return rwInsertList;
}
#endif
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 175dcf0e84..4c0d35c9e2 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1241,7 +1241,7 @@ PortalRunMulti(Portal portal, bool isTopLevel,
{
ListCell *stmtlist_item;
#ifdef PGXC
- combineTag combine;
+ CombineTag combine;
combine.cmdType = CMD_UNKNOWN;
combine.data[0] = '\0';
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 8f6c10a75c..d50b415aad 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -46,6 +46,12 @@ typedef enum
REQUEST_TYPE_COPY_OUT /* Copy Out response */
} RequestType;
+/* Combines results of INSERT statements using multiple values */
+typedef struct CombineTag
+{
+ CmdType cmdType; /* DML command type */
+ char data[COMPLETION_TAG_BUFSIZE]; /* execution result combination data */
+} CombineTag;
/*
* Represents a DataRow message received from a remote node.
@@ -142,7 +148,7 @@ extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner);
#ifdef PGXC
-extern void HandleCmdComplete(CmdType commandType, combineTag *combine, const char *msg_body,
+extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body,
size_t len);
#endif
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h
index 11a897467f..ca39fce870 100644
--- a/src/include/tcop/pquery.h
+++ b/src/include/tcop/pquery.h
@@ -17,13 +17,6 @@
#include "nodes/parsenodes.h"
#include "utils/portal.h"
-#ifdef PGXC
-typedef struct combineTag
-{
- CmdType cmdType;
- char data[COMPLETION_TAG_BUFSIZE];
-} combineTag;
-#endif
extern PGDLLIMPORT Portal ActivePortal;