conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
{
plproxy_error(func, "[%s] %s: %s",
- PQdb(conn->db), desc, PQerrorMessage(conn->db));
+ PQdb(conn->cur->db), desc, PQerrorMessage(conn->cur->db));
}
/* Compare if major/minor match. Works on "MAJ.MIN.*" */
int res;
/* flush it down */
- res = PQflush(conn->db);
+ res = PQflush(conn->cur->db);
/* set actual state */
if (res > 0)
- conn->state = C_QUERY_WRITE;
+ conn->cur->state = C_QUERY_WRITE;
else if (res == 0)
- conn->state = C_QUERY_READ;
+ conn->cur->state = C_QUERY_READ;
else
conn_error(func, conn, "PQflush");
}
/*
* check if target server has same backend version.
*/
- dst_ver = PQparameterStatus(conn->db, "server_version");
- conn->same_ver = cmp_branch(dst_ver, PG_VERSION);
+ dst_ver = PQparameterStatus(conn->cur->db, "server_version");
+ conn->cur->same_ver = cmp_branch(dst_ver, PG_VERSION);
/*
* Make sure remote I/O is done using local server_encoding.
*/
this_enc = GetDatabaseEncodingName();
- dst_enc = PQparameterStatus(conn->db, "client_encoding");
+ dst_enc = PQparameterStatus(conn->cur->db, "client_encoding");
if (dst_enc && strcmp(this_enc, dst_enc))
{
if (!sql)
/*
* if second time in this function, they should be active already.
*/
- if (sql && conn->tuning)
+ if (sql && conn->cur->tuning)
{
/* display SET query */
appendStringInfo(sql, "-- does not seem to apply");
*/
if (sql)
{
- conn->tuning = 1;
- conn->state = C_QUERY_WRITE;
- if (!PQsendQuery(conn->db, sql->data))
+ conn->cur->tuning = 1;
+ conn->cur->state = C_QUERY_WRITE;
+ if (!PQsendQuery(conn->cur->db, sql->data))
conn_error(func, conn, "PQsendQuery");
pfree(sql->data);
pfree(sql);
return 1;
}
- conn->tuning = 0;
+ conn->cur->tuning = 0;
return 0;
}
int binary_result = 0;
gettimeofday(&now, NULL);
- conn->query_time = now.tv_sec;
+ conn->cur->query_time = now.tv_sec;
tune_connection(func, conn);
- if (conn->tuning)
+ if (conn->cur->tuning)
return;
/* use binary result only on same backend ver */
- if (cf->disable_binary == 0 && conn->same_ver)
+ if (cf->disable_binary == 0 && conn->cur->same_ver)
{
/* binary recv for non-record types */
if (func->ret_scalar)
}
/* send query */
- conn->state = C_QUERY_WRITE;
- res = PQsendQueryParams(conn->db, q->sql, q->arg_count,
+ conn->cur->state = C_QUERY_WRITE;
+ res = PQsendQueryParams(conn->cur->db, q->sql, q->arg_count,
NULL, /* paramTypes */
values, /* paramValues */
plengths, /* paramLengths */
struct pollfd pfd;
ProxyConfig *cf = &func->cur_cluster->config;
- if (PQstatus(conn->db) != CONNECTION_OK)
+ if (PQstatus(conn->cur->db) != CONNECTION_OK)
return false;
/* check if too old */
if (cf->connection_lifetime > 0)
{
- t = now->tv_sec - conn->connect_time;
+ t = now->tv_sec - conn->cur->connect_time;
if (t >= cf->connection_lifetime)
return false;
}
/* how long ts been idle */
- t = now->tv_sec - conn->query_time;
+ t = now->tv_sec - conn->cur->query_time;
if (t < PLPROXY_IDLE_CONN_CHECK)
return true;
* are events pending. If there are drop the connection.
*/
intr_loop:
- pfd.fd = PQsocket(conn->db);
+ pfd.fd = PQsocket(conn->cur->db);
pfd.events = POLLIN;
pfd.revents = 0;
{
struct sockaddr sa;
socklen_t salen = sizeof(sa);
- int fd = PQsocket(conn->db);
+ int fd = PQsocket(conn->cur->db);
ProxyConfig *config = &conn->cluster->config;
/* turn on keepalive */
gettimeofday(&now, NULL);
/* state should be C_READY or C_NONE */
- switch (conn->state)
+ switch (conn->cur->state)
{
case C_DONE:
- conn->state = C_READY;
+ conn->cur->state = C_READY;
case C_READY:
if (check_old_conn(func, conn, &now))
return;
case C_QUERY_WRITE:
/* close rotten connection */
elog(NOTICE, "PL/Proxy: dropping stale conn");
- PQfinish(conn->db);
- conn->db = NULL;
- conn->state = C_NONE;
- conn->tuning = 0;
+ PQfinish(conn->cur->db);
+ conn->cur->db = NULL;
+ conn->cur->state = C_NONE;
+ conn->cur->tuning = 0;
case C_NONE:
break;
}
- conn->connect_time = now.tv_sec;
+ conn->cur->connect_time = now.tv_sec;
/* launch new connection */
- conn->db = PQconnectStart(conn->connstr);
- if (conn->db == NULL)
+ conn->cur->db = PQconnectStart(conn->connstr);
+ if (conn->cur->db == NULL)
plproxy_error(func, "No memory for PGconn");
/* tag connection dirty */
- conn->state = C_CONNECT_WRITE;
+ conn->cur->state = C_CONNECT_WRITE;
- if (PQstatus(conn->db) == CONNECTION_BAD)
+ if (PQstatus(conn->cur->db) == CONNECTION_BAD)
conn_error(func, conn, "PQconnectStart");
/* override default notice handler */
- PQsetNoticeReceiver(conn->db, handle_notice, conn);
+ PQsetNoticeReceiver(conn->cur->db, handle_notice, conn);
setup_keepalive(conn);
}
PGresult *res;
/* got one */
- res = PQgetResult(conn->db);
+ res = PQgetResult(conn->cur->db);
if (res == NULL)
{
- if (conn->tuning)
- conn->state = C_READY;
+ if (conn->cur->tuning)
+ conn->cur->state = C_READY;
else
- conn->state = C_DONE;
+ conn->cur->state = C_DONE;
return false;
}
int res;
PostgresPollingStatusType poll_res;
- switch (conn->state)
+ switch (conn->cur->state)
{
case C_CONNECT_READ:
case C_CONNECT_WRITE:
- poll_res = PQconnectPoll(conn->db);
+ poll_res = PQconnectPoll(conn->cur->db);
switch (poll_res)
{
case PGRES_POLLING_WRITING:
- conn->state = C_CONNECT_WRITE;
+ conn->cur->state = C_CONNECT_WRITE;
break;
case PGRES_POLLING_READING:
- conn->state = C_CONNECT_READ;
+ conn->cur->state = C_CONNECT_READ;
break;
case PGRES_POLLING_OK:
- conn->state = C_READY;
+ conn->cur->state = C_READY;
break;
case PGRES_POLLING_ACTIVE:
case PGRES_POLLING_FAILED:
flush_connection(func, conn);
break;
case C_QUERY_READ:
- res = PQconsumeInput(conn->db);
+ res = PQconsumeInput(conn->cur->db);
if (res == 0)
conn_error(func, conn, "PQconsumeInput");
while (1)
{
/* if PQisBusy, then incomplete result */
- if (PQisBusy(conn->db))
+ if (PQisBusy(conn->cur->db))
break;
/* got one */
continue;
/* decide what to do */
- switch (conn->state)
+ switch (conn->cur->state)
{
case C_DONE:
case C_READY:
/* add fd to proper set */
pf = pfd_cache + numfds++;
- pf->fd = PQsocket(conn->db);
+ pf->fd = PQsocket(conn->cur->db);
pf->events = ev;
pf->revents = 0;
}
if (!conn->run_tag)
continue;
- switch (conn->state)
+ switch (conn->cur->state)
{
case C_DONE:
case C_READY:
/*
* they should be in same order as called,
*/
- fd = PQsocket(conn->db);
+ fd = PQsocket(conn->cur->db);
if (pf->fd != fd)
elog(WARNING, "fd order from poll() is messed up?");
{
ProxyConfig *cf = &cluster->config;
- switch (conn->state)
+ switch (conn->cur->state)
{
case C_CONNECT_READ:
case C_CONNECT_WRITE:
if (cf->connect_timeout <= 0)
break;
- if (now - conn->connect_time <= cf->connect_timeout)
+ if (now - conn->cur->connect_time <= cf->connect_timeout)
break;
plproxy_error(func, "connect timeout to: %s", conn->connstr);
break;
case C_QUERY_WRITE:
if (cf->query_timeout <= 0)
break;
- if (now - conn->query_time <= cf->query_timeout)
+ if (now - conn->cur->query_time <= cf->query_timeout)
break;
plproxy_error(func, "query timeout");
break;
pending++;
/* if conn is ready, then send query away */
- if (conn->state == C_READY)
+ if (conn->cur->state == C_READY)
send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
}
continue;
/* login finished, send query */
- if (conn->state == C_READY)
+ if (conn->cur->state == C_READY)
send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
- if (conn->state != C_DONE)
+ if (conn->cur->state != C_DONE)
pending++;
check_timeouts(func, cluster, conn, now.tv_sec);
if (!conn->run_tag)
continue;
- if (conn->state != C_DONE)
+ if (conn->cur->state != C_DONE)
plproxy_error(func, "Unfinished connection");
if (conn->res == NULL)
plproxy_error(func, "Lost result");
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (conn->state == C_NONE ||
- conn->state == C_READY ||
- conn->state == C_DONE)
+ if (conn->cur->state == C_NONE ||
+ conn->cur->state == C_READY ||
+ conn->cur->state == C_DONE)
continue;
- cancel = PQgetCancel(conn->db);
+ cancel = PQgetCancel(conn->cur->db);
if (cancel == NULL)
{
elog(NOTICE, "Invalid connection!");