#include "poll_compat.h"
+#if PG_VERSION_NUM < 80400
+static int geterrcode(void)
+{
+ ErrorData *edata = CopyErrorData();
+ int code = edata->sqlerrcode;
+ FreeErrorData(edata); /* unnecessary? */
+ return code;
+}
+#endif
+
/* some error happened */
static void
conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
}
}
+static void
+remote_cancel(ProxyFunction *func)
+{
+ ProxyConnection *conn;
+ ProxyCluster *cluster = func->cur_cluster;
+ PGcancel *cancel;
+ char errbuf[256];
+ int ret;
+ int i;
+
+ if (cluster == NULL)
+ return;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->state == C_NONE ||
+ conn->state == C_READY ||
+ conn->state == C_DONE)
+ continue;
+
+ cancel = PQgetCancel(conn->db);
+ if (cancel == NULL)
+ {
+ elog(NOTICE, "Invalid connection!");
+ continue;
+ }
+ ret = PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ if (ret == 0)
+ elog(NOTICE, "Cancel query failed!");
+ }
+}
+
/* Run hash function and tag connections */
static void
tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
}
}
- if (gotbin)
- remote_execute(func, values, plengths, pformats);
- else
- remote_execute(func, values, NULL, NULL);
+ /*
+ * Run query. On cancel, send cancel request to partitions too.
+ */
+ PG_TRY();
+ {
+ if (gotbin)
+ remote_execute(func, values, plengths, pformats);
+ else
+ remote_execute(func, values, NULL, NULL);
+ }
+ PG_CATCH();
+ {
+ if (geterrcode() == ERRCODE_QUERY_CANCELED)
+ remote_cancel(func);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
}