*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 375,386 **** static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 375,398 ----
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
+ static PgFdwModifyState *create_fdw_modify_state(ModifyTableState *mtstate,
+ 						ResultRelInfo *resultRelInfo,
+ 						CmdType operation,
+ 						int subplan_index,
+ 						char *query,
+ 						List *target_attrs,
+ 						bool has_returning,
+ 						List *retrieved_attrs);
  static void prepare_foreign_modify(PgFdwModifyState *fmstate);
  static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
  						 ItemPointer tupleid,
  						 TupleTableSlot *slot);
+ static int execute_prep_stmt(PgFdwModifyState *fmstate,
+ 				  const char **p_values,
+ 				  TupleTableSlot *slot);
  static void store_returning_result(PgFdwModifyState *fmstate,
  					   TupleTableSlot *slot, PGresult *res);
+ static void finish_foreign_modify(PgFdwModifyState *fmstate);
  static List *build_remote_returning(Index rtindex, Relation rel,
  					   List *returningList);
  static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
***************
*** 1678,1695 **** postgresBeginForeignModify(ModifyTableState *mtstate,
  						   int eflags)
  {
  	PgFdwModifyState *fmstate;
! 	EState	   *estate = mtstate->ps.state;
! 	CmdType		operation = mtstate->operation;
! 	Relation	rel = resultRelInfo->ri_RelationDesc;
! 	RangeTblEntry *rte;
! 	Oid			userid;
! 	ForeignTable *table;
! 	UserMapping *user;
! 	AttrNumber	n_params;
! 	Oid			typefnoid;
! 	bool		isvarlena;
! 	ListCell   *lc;
! 	TupleDesc	tupdesc = RelationGetDescr(rel);
  
  	/*
  	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
--- 1690,1699 ----
  						   int eflags)
  {
  	PgFdwModifyState *fmstate;
! 	char	   *query;
! 	List	   *target_attrs;
! 	bool		has_returning;
! 	List	   *retrieved_attrs;
  
  	/*
  	 * Do nothing in EXPLAIN (no ANALYZE) case.  resultRelInfo->ri_FdwState
***************
*** 1698,1779 **** postgresBeginForeignModify(ModifyTableState *mtstate,
  	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
  		return;
  
- 	/* Begin constructing PgFdwModifyState. */
- 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
- 	fmstate->rel = rel;
- 
- 	/*
- 	 * Identify which user to do the remote access as.  This should match what
- 	 * ExecCheckRTEPerms() does.
- 	 */
- 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
- 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
- 
- 	/* Get info about foreign table. */
- 	table = GetForeignTable(RelationGetRelid(rel));
- 	user = GetUserMapping(userid, table->serverid);
- 
- 	/* Open connection; report that we'll create a prepared statement. */
- 	fmstate->conn = GetConnection(user, true);
- 	fmstate->p_name = NULL;		/* prepared statement not made yet */
- 
  	/* Deconstruct fdw_private data. */
! 	fmstate->query = strVal(list_nth(fdw_private,
! 									 FdwModifyPrivateUpdateSql));
! 	fmstate->target_attrs = (List *) list_nth(fdw_private,
! 											  FdwModifyPrivateTargetAttnums);
! 	fmstate->has_returning = intVal(list_nth(fdw_private,
! 											 FdwModifyPrivateHasReturning));
! 	fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
! 												 FdwModifyPrivateRetrievedAttrs);
! 
! 	/* Create context for per-tuple temp workspace. */
! 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
! 											  "postgres_fdw temporary data",
! 											  ALLOCSET_SMALL_SIZES);
! 
! 	/* Prepare for input conversion of RETURNING results. */
! 	if (fmstate->has_returning)
! 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
! 
! 	/* Prepare for output conversion of parameters used in prepared stmt. */
! 	n_params = list_length(fmstate->target_attrs) + 1;
! 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
! 	fmstate->p_nums = 0;
! 
! 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
! 	{
! 		/* Find the ctid resjunk column in the subplan's result */
! 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
! 
! 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
! 														  "ctid");
! 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
! 			elog(ERROR, "could not find junk ctid column");
! 
! 		/* First transmittable parameter will be ctid */
! 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
! 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
! 		fmstate->p_nums++;
! 	}
! 
! 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
! 	{
! 		/* Set up for remaining transmittable parameters */
! 		foreach(lc, fmstate->target_attrs)
! 		{
! 			int			attnum = lfirst_int(lc);
! 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
  
! 			Assert(!attr->attisdropped);
! 
! 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
! 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
! 			fmstate->p_nums++;
! 		}
! 	}
! 
! 	Assert(fmstate->p_nums <= n_params);
  
  	resultRelInfo->ri_FdwState = fmstate;
  }
--- 1702,1726 ----
  	if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
  		return;
  
  	/* Deconstruct fdw_private data. */
! 	query = strVal(list_nth(fdw_private,
! 							FdwModifyPrivateUpdateSql));
! 	target_attrs = (List *) list_nth(fdw_private,
! 									 FdwModifyPrivateTargetAttnums);
! 	has_returning = intVal(list_nth(fdw_private,
! 									FdwModifyPrivateHasReturning));
! 	retrieved_attrs = (List *) list_nth(fdw_private,
! 										FdwModifyPrivateRetrievedAttrs);
  
! 	/* Construct an execution state. */
! 	fmstate = create_fdw_modify_state(mtstate,
! 									  resultRelInfo,
! 									  mtstate->operation,
! 									  subplan_index,
! 									  query,
! 									  target_attrs,
! 									  has_returning,
! 									  retrieved_attrs);
  
  	resultRelInfo->ri_FdwState = fmstate;
  }
***************
*** 1790,1796 **** postgresExecForeignInsert(EState *estate,
  {
  	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1737,1742 ----
***************
*** 1800,1840 **** postgresExecForeignInsert(EState *estate,
  	/* Convert parameters needed by prepared statement to text form */
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1746,1753 ----
  	/* Convert parameters needed by prepared statement to text form */
  	p_values = convert_prep_stmt_params(fmstate, NULL, slot);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 1856,1862 **** postgresExecForeignUpdate(EState *estate,
  	Datum		datum;
  	bool		isNull;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1769,1774 ----
***************
*** 1876,1916 **** postgresExecForeignUpdate(EState *estate,
  										(ItemPointer) DatumGetPointer(datum),
  										slot);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1788,1795 ----
  										(ItemPointer) DatumGetPointer(datum),
  										slot);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 1932,1938 **** postgresExecForeignDelete(EState *estate,
  	Datum		datum;
  	bool		isNull;
  	const char **p_values;
- 	PGresult   *res;
  	int			n_rows;
  
  	/* Set up the prepared statement on the remote server, if we didn't yet */
--- 1811,1816 ----
***************
*** 1952,1992 **** postgresExecForeignDelete(EState *estate,
  										(ItemPointer) DatumGetPointer(datum),
  										NULL);
  
! 	/*
! 	 * Execute the prepared statement.
! 	 */
! 	if (!PQsendQueryPrepared(fmstate->conn,
! 							 fmstate->p_name,
! 							 fmstate->p_nums,
! 							 p_values,
! 							 NULL,
! 							 NULL,
! 							 0))
! 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
! 
! 	/*
! 	 * Get the result, and check for success.
! 	 *
! 	 * We don't use a PG_TRY block here, so be careful not to throw error
! 	 * without releasing the PGresult.
! 	 */
! 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
! 	if (PQresultStatus(res) !=
! 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
! 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
! 
! 	/* Check number of rows affected, and fetch RETURNING tuple if any */
! 	if (fmstate->has_returning)
! 	{
! 		n_rows = PQntuples(res);
! 		if (n_rows > 0)
! 			store_returning_result(fmstate, slot, res);
! 	}
! 	else
! 		n_rows = atoi(PQcmdTuples(res));
! 
! 	/* And clean up */
! 	PQclear(res);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
--- 1830,1837 ----
  										(ItemPointer) DatumGetPointer(datum),
  										NULL);
  
! 	/* Execute the prepared statement and fetch RETURNING tuple if any */
! 	n_rows = execute_prep_stmt(fmstate, p_values, slot);
  
  	MemoryContextReset(fmstate->temp_cxt);
  
***************
*** 2008,2035 **** postgresEndForeignModify(EState *estate,
  	if (fmstate == NULL)
  		return;
  
! 	/* If we created a prepared statement, destroy it */
! 	if (fmstate->p_name)
! 	{
! 		char		sql[64];
! 		PGresult   *res;
! 
! 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
! 
! 		/*
! 		 * We don't use a PG_TRY block here, so be careful not to throw error
! 		 * without releasing the PGresult.
! 		 */
! 		res = pgfdw_exec_query(fmstate->conn, sql);
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
! 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
! 		PQclear(res);
! 		fmstate->p_name = NULL;
! 	}
! 
! 	/* Release remote connection */
! 	ReleaseConnection(fmstate->conn);
! 	fmstate->conn = NULL;
  }
  
  /*
--- 1853,1860 ----
  	if (fmstate == NULL)
  		return;
  
! 	/* Destroy the execution state. */
! 	finish_foreign_modify(fmstate);
  }
  
  /*
***************
*** 3217,3222 **** close_cursor(PGconn *conn, unsigned int cursor_number)
--- 3042,3151 ----
  }
  
  /*
+  * create_fdw_modify_state
+  *		Construct an execution state of a foreign insert/update/delete
+  *		operation.
+  */
+ static PgFdwModifyState *
+ create_fdw_modify_state(ModifyTableState *mtstate,
+ 						ResultRelInfo *resultRelInfo,
+ 						CmdType operation,
+ 						int subplan_index,
+ 						char *query,
+ 						List *target_attrs,
+ 						bool has_returning,
+ 						List *retrieved_attrs)
+ {
+ 	PgFdwModifyState *fmstate;
+ 	EState	   *estate = mtstate->ps.state;
+ 	Relation	rel = resultRelInfo->ri_RelationDesc;
+ 	RangeTblEntry *rte;
+ 	Oid			userid;
+ 	ForeignTable *table;
+ 	UserMapping *user;
+ 	AttrNumber	n_params;
+ 	Oid			typefnoid;
+ 	bool		isvarlena;
+ 	ListCell   *lc;
+ 	TupleDesc	tupdesc = RelationGetDescr(rel);
+ 
+ 	/* Begin constructing PgFdwModifyState. */
+ 	fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+ 	fmstate->rel = rel;
+ 
+ 	/*
+ 	 * Identify which user to do the remote access as.  This should match what
+ 	 * ExecCheckRTEPerms() does.
+ 	 */
+ 	rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+ 	userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ 
+ 	/* Get info about foreign table. */
+ 	table = GetForeignTable(RelationGetRelid(rel));
+ 	user = GetUserMapping(userid, table->serverid);
+ 
+ 	/* Open connection; report that we'll create a prepared statement. */
+ 	fmstate->conn = GetConnection(user, true);
+ 	fmstate->p_name = NULL;		/* prepared statement not made yet */
+ 
+ 	/* Set remote query information. */
+ 	fmstate->query = query;
+ 	fmstate->target_attrs = target_attrs;
+ 	fmstate->has_returning = has_returning;
+ 	fmstate->retrieved_attrs = retrieved_attrs;
+ 
+ 	/* Create context for per-tuple temp workspace. */
+ 	fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ 											  "postgres_fdw temporary data",
+ 											  ALLOCSET_SMALL_SIZES);
+ 
+ 	/* Prepare for input conversion of RETURNING results. */
+ 	if (fmstate->has_returning)
+ 		fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+ 
+ 	/* Prepare for output conversion of parameters used in prepared stmt. */
+ 	n_params = list_length(fmstate->target_attrs) + 1;
+ 	fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+ 	fmstate->p_nums = 0;
+ 
+ 	if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ 	{
+ 		/* Find the ctid resjunk column in the subplan's result */
+ 		Plan	   *subplan = mtstate->mt_plans[subplan_index]->plan;
+ 
+ 		fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+ 														  "ctid");
+ 		if (!AttributeNumberIsValid(fmstate->ctidAttno))
+ 			elog(ERROR, "could not find junk ctid column");
+ 
+ 		/* First transmittable parameter will be ctid */
+ 		getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ 		fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ 		fmstate->p_nums++;
+ 	}
+ 
+ 	if (operation == CMD_INSERT || operation == CMD_UPDATE)
+ 	{
+ 		/* Set up for remaining transmittable parameters */
+ 		foreach(lc, fmstate->target_attrs)
+ 		{
+ 			int			attnum = lfirst_int(lc);
+ 			Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
+ 
+ 			Assert(!attr->attisdropped);
+ 
+ 			getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+ 			fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ 			fmstate->p_nums++;
+ 		}
+ 	}
+ 
+ 	Assert(fmstate->p_nums <= n_params);
+ 
+ 	return fmstate;
+ }
+ 
+ /*
   * prepare_foreign_modify
   *		Establish a prepared statement for execution of INSERT/UPDATE/DELETE
   */
***************
*** 3326,3331 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
--- 3255,3311 ----
  }
  
  /*
+  * execute_prep_stmt
+  *		Execute the prepared statement and fetch RETURNING tuple if any
+  */
+ static int
+ execute_prep_stmt(PgFdwModifyState *fmstate,
+ 				  const char **p_values,
+ 				  TupleTableSlot *slot)
+ {
+ 	PGresult   *res;
+ 	int			n_rows;
+ 
+ 	/*
+ 	 * Execute the prepared statement.
+ 	 */
+ 	if (!PQsendQueryPrepared(fmstate->conn,
+ 							 fmstate->p_name,
+ 							 fmstate->p_nums,
+ 							 p_values,
+ 							 NULL,
+ 							 NULL,
+ 							 0))
+ 		pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ 
+ 	/*
+ 	 * Get the result, and check for success.
+ 	 *
+ 	 * We don't use a PG_TRY block here, so be careful not to throw error
+ 	 * without releasing the PGresult.
+ 	 */
+ 	res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ 	if (PQresultStatus(res) !=
+ 		(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ 		pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ 
+ 	/* Check number of rows affected, and fetch RETURNING tuple if any */
+ 	if (fmstate->has_returning)
+ 	{
+ 		n_rows = PQntuples(res);
+ 		if (n_rows > 0)
+ 			store_returning_result(fmstate, slot, res);
+ 	}
+ 	else
+ 		n_rows = atoi(PQcmdTuples(res));
+ 
+ 	/* And clean up */
+ 	PQclear(res);
+ 
+ 	return n_rows;
+ }
+ 
+ /*
   * store_returning_result
   *		Store the result of a RETURNING clause
   *
***************
*** 3359,3364 **** store_returning_result(PgFdwModifyState *fmstate,
--- 3339,3377 ----
  }
  
  /*
+  * finish_foreign_modify
+  *		Release resources for a foreign insert/update/delete operation.
+  */
+ static void
+ finish_foreign_modify(PgFdwModifyState *fmstate)
+ {
+ 	Assert(fmstate != NULL);
+ 
+ 	/* If we created a prepared statement, destroy it */
+ 	if (fmstate->p_name)
+ 	{
+ 		char		sql[64];
+ 		PGresult   *res;
+ 
+ 		snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+ 
+ 		/*
+ 		 * We don't use a PG_TRY block here, so be careful not to throw error
+ 		 * without releasing the PGresult.
+ 		 */
+ 		res = pgfdw_exec_query(fmstate->conn, sql);
+ 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ 			pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ 		PQclear(res);
+ 		fmstate->p_name = NULL;
+ 	}
+ 
+ 	/* Release remote connection */
+ 	ReleaseConnection(fmstate->conn);
+ 	fmstate->conn = NULL;
+ }
+ 
+ /*
   * build_remote_returning
   *		Build a RETURNING targetlist of a remote query for performing an
   *		UPDATE/DELETE .. RETURNING on a join directly
