diff options
author | Michael P | 2011-01-28 01:21:14 +0000 |
---|---|---|
committer | Pavan Deolasee | 2011-05-19 17:49:35 +0000 |
commit | e0617d4a90e08a0774a480a2a650b500f2ff8ceb (patch) | |
tree | 98965d39a9e8d4644540f6c3e0c80649f0011e96 | |
parent | 8dfc3fa3d39ae4bd8a068aca3334ed3d47005cf1 (diff) |
Fix for replicated tables using multi-INSERT queries.
Comments have also been added on functions created for this support.
A structure used to combine results has been moved from pquery.c to ExecRemote.c
to limit dependencies.
Patch by Benny Wang, with some editorialization from me
-rw-r--r-- | src/backend/pgxc/plan/planner.c | 38 | ||||
-rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 13 | ||||
-rw-r--r-- | src/backend/rewrite/rewriteHandler.c | 100 | ||||
-rw-r--r-- | src/backend/tcop/pquery.c | 2 | ||||
-rw-r--r-- | src/include/pgxc/execRemote.h | 8 | ||||
-rw-r--r-- | src/include/tcop/pquery.h | 7 |
6 files changed, 117 insertions, 51 deletions
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index 1cbf587f88..ff6ce81fb0 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -488,23 +488,19 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) /* Bad relation type */ return; - /* Get result relation info */ rel_loc_info = GetRelationLocInfo(rte->relid); if (!rel_loc_info) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Could not find relation for oid = %d", rte->relid)))); + (errmsg("Could not find relation for oid = %d", rte->relid)))); - if (query->jointree != NULL && query->jointree->fromlist != NULL) + /* Optimization is only done for distributed tables */ + if (query->jointree != NULL + && query->jointree->fromlist != NULL + && rel_loc_info->locatorType == LOCATOR_TYPE_HASH) { - /* INSERT SELECT suspected */ - - /* We only optimize for when the destination is partitioned */ - if (rel_loc_info->locatorType != LOCATOR_TYPE_HASH) - return; - /* * See if it is "single-step" * Optimize for just known common case with 2 RTE entries @@ -546,7 +542,7 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) if (rel_loc_info->locatorType == LOCATOR_TYPE_HASH && rel_loc_info->partAttrName != NULL) { - Expr *checkexpr; + Expr *checkexpr; TargetEntry *tle = NULL; /* It is a partitioned table, get value by looking in targetList */ @@ -602,7 +598,7 @@ get_plan_nodes_insert(PlannerInfo *root, RemoteQuery *step) if (!source_rel_loc_info) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Could not find relation for oid = %d", rte->relid)))); + (errmsg("Could not find relation for oid = %d", rte->relid)))); if (source_rel_loc_info->locatorType == LOCATOR_TYPE_HASH && strcmp(col_base->colname, source_rel_loc_info->partAttrName) == 0) @@ -3247,16 +3243,26 @@ is_pgxc_safe_func(Oid funcid) return ret_val; } -/* code is borrowed from get_plan_nodes_insert */ +/* + * GetHashExecNodes - + * Get hash key of execution nodes according to the expression value + * + * Input parameters: + * rel_loc_info is a locator function. It contains distribution information. + * exec_nodes is the list of nodes to be executed + * expr is the partition column value + * + * code is borrowed from get_plan_nodes_insert + */ void GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Expr *expr) { /* We may have a cast, try and handle it */ Expr *checkexpr; Expr *eval_expr = NULL; - Const *constant; - long part_value; - long *part_value_ptr = NULL; + Const *constant; + long part_value; + long *part_value_ptr = NULL; eval_expr = (Expr *) eval_const_expressions(NULL, (Node *)expr); checkexpr = get_numeric_constant(eval_expr); @@ -3276,7 +3282,7 @@ GetHashExecNodes(RelationLocInfo *rel_loc_info, ExecNodes **exec_nodes, const Ex /* single call handles both replicated and partitioned types */ *exec_nodes = GetRelationNodes(rel_loc_info, part_value_ptr, - RELATION_ACCESS_INSERT); + RELATION_ACCESS_INSERT); if (eval_expr) pfree(eval_expr); diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 7621c153be..5cd60b7e62 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -815,9 +815,18 @@ HandleError(RemoteQueryState *combiner, char *msg_body, size_t len) combiner->command_complete_count++; } -/* combine deparsed sql statements execution results */ +/* + * HandleCmdComplete - + * combine deparsed sql statements execution results + * + * Input parameters: + * commandType is dml command type + * combineTag is used to combine the completion result + * msg_body is execution result needed to combine + * len is msg_body size + */ void -HandleCmdComplete(CmdType commandType, combineTag *combine, +HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, size_t len) { int digits = 0; diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index df16c1e134..83b45afddf 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -2181,12 +2181,18 @@ QueryRewrite(Query *parsetree) } #ifdef PGXC +/* + * Part of handling INSERT queries with multiple values + * + * GetRelPartColPos - + * Get the partition column position in targetList + */ static int GetRelPartColPos(const Query *query, const char *partColName) { ListCell *lc; int rescol = -1; - + foreach(lc, query->targetList) { TargetEntry *tle = (TargetEntry *) lfirst(lc); @@ -2212,18 +2218,40 @@ GetRelPartColPos(const Query *query, const char *partColName) return rescol; } +/* + * Part of handling INSERT queries with multiple values + * + * ProcessHashValue - + * associates the inserted row to the specified datanode + * + * Input parameters: + * subList is the inserted row + * node is the node number + */ static void ProcessHashValue(List **valuesList, const List *subList, const int node) { valuesList[node - 1] = lappend(valuesList[node - 1], subList); } +/* + * Part of handling INSERT queries with multiple values + * + * InitValuesList - + * Allocate and initialize the list of values + */ static void InitValuesList(List **valuesList[], int size) { *valuesList = palloc0(size * sizeof(List *)); } +/* + * Part of handling INSERT queries with multiple values + * + * InitValuesList - + * free all the list of values + */ static void DestroyValuesList(List **valuesList[]) { @@ -2231,9 +2259,20 @@ DestroyValuesList(List **valuesList[]) *valuesList = NIL; } +/* + * Part of handling INSERT queries with multiple values + * + * ProcessRobinValue - + * assign insert values list to each node averagely + * + * Input parameters: + * valuesList is an array of lists used to assign value list to specified nodes + * size is number of assigned nodes + * values_rte is the values list + */ static void ProcessRobinValue(Oid relid, List **valuesList, - int size, const RangeTblEntry *values_rte) + int size, const RangeTblEntry *values_rte) { List *values = values_rte->values_lists; int length = values->length; @@ -2242,9 +2281,9 @@ ProcessRobinValue(Oid relid, List **valuesList, int processNum = 0; int node; - /* get average insert value number of each node */ - if (length > NumDataNodes) - dist = length/NumDataNodes; + /* Get average insert value number of each node */ + if (length > size) + dist = length/size; else dist = 1; @@ -2252,7 +2291,7 @@ ProcessRobinValue(Oid relid, List **valuesList, { node = GetRoundRobinNode(relid); - /* assign insert value */ + /* Assign insert value */ for(j = 0; j < dist; j++) { processNum += 1; @@ -2260,17 +2299,30 @@ ProcessRobinValue(Oid relid, List **valuesList, list_nth(values, processNum - 1)); } } - - /* assign remained value */ + + /* Assign remained value */ while(processNum < length) { processNum += 1; node = GetRoundRobinNode(relid); valuesList[node - 1] = lappend(valuesList[node - 1], - list_nth(values, processNum - 1)); + list_nth(values, processNum - 1)); } } +/* + * Part of handling INSERT queries with multiple values + * + * RewriteInsertStmt - + * Rewrite INSERT statement. + * Split the INSERT statement with mutiple values into mutiple insert statements + * according to its distribution key. Distribution rule is as follows: + * 1.LOCATOR_TYPE_HASH: associates correct node with its distribution key + * 2.LOCATOR_TYPE_RROBIN: assign value lists to each datanodes averagely + * 3.DEFAULT: no need to process (replicate case) + * + * values_rte is the values list range table. + */ static List * RewriteInsertStmt(Query *query, RangeTblEntry *values_rte) { @@ -2285,7 +2337,7 @@ RewriteInsertStmt(Query *query, RangeTblEntry *values_rte) char *partColName; List **valuesList; int i; - + rte = (RangeTblEntry *) list_nth(query->rtable, query->resultRelation - 1); rte_loc_info = GetRelationLocInfo(rte->relid); locatorType = rte_loc_info->locatorType; @@ -2309,44 +2361,45 @@ RewriteInsertStmt(Query *query, RangeTblEntry *values_rte) foreach(values_lc, values_rte->values_lists) { List *sublist = (List *)lfirst(values_lc); - + if (first) { + /* Get the partition column number in the targetList */ partColno = GetRelPartColPos(query, partColName); first = false; } - /* get the exec node according to partition column value */ + /* Get the exec node according to partition column value */ GetHashExecNodes(rte_loc_info, &exec_nodes, (Expr *)list_nth(sublist, partColno)); Assert(exec_nodes->nodelist->length == 1); - /* assign valueList to specified exec node */ + /* Assign valueList to specified execution node */ ProcessHashValue(valuesList, sublist, list_nth_int(exec_nodes->nodelist, 0)); } } - + goto collect; case LOCATOR_TYPE_RROBIN: - + InitValuesList(&valuesList, NumDataNodes); - /* assign valueList to specified exec node */ + /* Assign valueList to specified execution node */ ProcessRobinValue(rte->relid, valuesList, NumDataNodes, values_rte); -collect: - /* produce query for relative datanodes */ - for(i = 0; i < NumDataNodes; i++) +collect: + /* Produce query for relative Datanodes */ + for (i = 0; i < NumDataNodes; i++) { if (valuesList[i] != NIL) { ExecNodes *execNodes = makeNode(ExecNodes); execNodes->baselocatortype = rte_loc_info->locatorType; execNodes->nodelist = lappend_int(execNodes->nodelist, i + 1); - + element = copyObject(query); - + rte = (RangeTblEntry *)list_nth(element->rtable, rtr->rtindex - 1); rte->values_lists = valuesList[i]; @@ -2360,16 +2413,15 @@ collect: rwInsertList = lappend(rwInsertList, element); } } - + DestroyValuesList(&valuesList); break; - + default: /* distribute by replication: just do it as usual */ rwInsertList = lappend(rwInsertList, query); break; } - return rwInsertList; } #endif diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 175dcf0e84..4c0d35c9e2 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1241,7 +1241,7 @@ PortalRunMulti(Portal portal, bool isTopLevel, { ListCell *stmtlist_item; #ifdef PGXC - combineTag combine; + CombineTag combine; combine.cmdType = CMD_UNKNOWN; combine.data[0] = '\0'; diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 8f6c10a75c..d50b415aad 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -46,6 +46,12 @@ typedef enum REQUEST_TYPE_COPY_OUT /* Copy Out response */ } RequestType; +/* Combines results of INSERT statements using multiple values */ +typedef struct CombineTag +{ + CmdType cmdType; /* DML command type */ + char data[COMPLETION_TAG_BUFSIZE]; /* execution result combination data */ +} CombineTag; /* * Represents a DataRow message received from a remote node. @@ -142,7 +148,7 @@ extern void ExecRemoteUtility(RemoteQuery *node); extern int handle_response(PGXCNodeHandle * conn, RemoteQueryState *combiner); #ifdef PGXC -extern void HandleCmdComplete(CmdType commandType, combineTag *combine, const char *msg_body, +extern void HandleCmdComplete(CmdType commandType, CombineTag *combine, const char *msg_body, size_t len); #endif extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot); diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h index 11a897467f..ca39fce870 100644 --- a/src/include/tcop/pquery.h +++ b/src/include/tcop/pquery.h @@ -17,13 +17,6 @@ #include "nodes/parsenodes.h" #include "utils/portal.h" -#ifdef PGXC -typedef struct combineTag -{ - CmdType cmdType; - char data[COMPLETION_TAG_BUFSIZE]; -} combineTag; -#endif extern PGDLLIMPORT Portal ActivePortal; |