summaryrefslogtreecommitdiff
path: root/src/protocol/child.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/child.c')
-rw-r--r--src/protocol/child.c213
1 files changed, 125 insertions, 88 deletions
diff --git a/src/protocol/child.c b/src/protocol/child.c
index 7aea33540..cf2161806 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -68,19 +68,19 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
-static StartupPacket *read_startup_packet(POOL_CONNECTION * cp);
-static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend);
+static StartupPacket *read_startup_packet(POOL_CONNECTION *cp);
+static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend);
static RETSIGTYPE die(int sig);
static RETSIGTYPE close_idle_connection(int sig);
static RETSIGTYPE wakeup_handler(int sig);
static RETSIGTYPE reload_config_handler(int sig);
static RETSIGTYPE authentication_timeout(int sig);
-static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend);
+static void send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
static int connection_count_up(void);
static void connection_count_down(void);
-static bool connect_using_existing_connection(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
- StartupPacket *sp);
+static bool connect_using_existing_connection(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
+ StartupPacket *sp);
static void check_restart_request(void);
static void check_exit_request(void);
static void enable_authentication_timeout(void);
@@ -89,17 +89,17 @@ static int wait_for_new_connections(int *fds, SockAddr *saddr);
static void check_config_reload(void);
static void get_backends_status(unsigned int *valid_backends, unsigned int *down_backends);
static void validate_backend_connectivity(int front_end_fd);
-static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr);
-static POOL_CONNECTION_POOL * get_backend_connection(POOL_CONNECTION * frontend);
+static POOL_CONNECTION *get_connection(int front_end_fd, SockAddr *saddr);
+static POOL_CONNECTION_POOL *get_backend_connection(POOL_CONNECTION *frontend);
static StartupPacket *StartupPacketCopy(StartupPacket *sp);
static void log_disconnections(char *database, char *username);
static void print_process_status(char *remote_host, char *remote_port);
-static bool backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid);
+static bool backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid);
static void child_will_go_down(int code, Datum arg);
-static int opt_sort(const void *a, const void *b);
+static int opt_sort(const void *a, const void *b);
-static bool unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt);
+static bool unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt);
/*
* Non 0 means SIGTERM (smart shutdown) or SIGINT (fast shutdown) has arrived
@@ -154,11 +154,12 @@ do_child(int *fds)
sigjmp_buf local_sigjmp_buf;
POOL_CONNECTION_POOL *volatile backend = NULL;
- /* counter for child_max_connections. "volatile" declaration is necessary
+ /*
+ * counter for child_max_connections. "volatile" declaration is necessary
* so that this is counted up even if long jump is issued due to
* ereport(ERROR).
*/
- volatile int connections_count = 0;
+ volatile int connections_count = 0;
char psbuf[NI_MAXHOST + 128];
@@ -355,7 +356,8 @@ do_child(int *fds)
*/
if (con_count > (pool_config->num_init_children - pool_config->reserved_connections))
{
- POOL_CONNECTION * cp;
+ POOL_CONNECTION *cp;
+
cp = pool_open(front_end_fd, false);
if (cp == NULL)
{
@@ -405,8 +407,8 @@ do_child(int *fds)
pool_initialize_private_backend_status();
/*
- * Connect to backend. Also do authentication between
- * frontend <--> pgpool and pgpool <--> backend.
+ * Connect to backend. Also do authentication between frontend <-->
+ * pgpool and pgpool <--> backend.
*/
backend = get_backend_connection(child_frontend);
if (!backend)
@@ -513,7 +515,7 @@ do_child(int *fds)
* return true if backend connection is cached
*/
static bool
-backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid)
+backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid)
{
StartupPacket *sp;
bool cache_connection = false;
@@ -600,18 +602,18 @@ backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * vol
* Read the startup packet and parse the contents.
*/
static StartupPacket *
-read_startup_packet(POOL_CONNECTION * cp)
+read_startup_packet(POOL_CONNECTION *cp)
{
StartupPacket *sp;
StartupPacket_v2 *sp2;
int protov;
int len;
char *p;
- char **guc_options;
- int opt_num = 0;
- char *sp_sort;
- char *tmpopt;
- int i;
+ char **guc_options;
+ int opt_num = 0;
+ char *sp_sort;
+ char *tmpopt;
+ int i;
sp = (StartupPacket *) palloc0(sizeof(*sp));
@@ -624,14 +626,16 @@ read_startup_packet(POOL_CONNECTION * cp)
len = ntohl(len);
len -= sizeof(len);
- if (len <= 0 || len >= MAX_STARTUP_PACKET_LENGTH)
+ if (len < 4 || len > MAX_STARTUP_PACKET_LENGTH)
ereport(ERROR,
(errmsg("failed while reading startup packet"),
errdetail("incorrect packet length (%d)", len)));
sp->startup_packet = palloc0(len);
- /* read startup packet */
+ /*
+ * Read startup packet except the length of the message.
+ */
pool_read_with_error(cp, sp->startup_packet, len,
"startup packet");
@@ -657,37 +661,38 @@ read_startup_packet(POOL_CONNECTION * cp)
case PROTO_MAJOR_V3: /* V3 */
/* copy startup_packet */
sp_sort = palloc0(len);
- memcpy(sp_sort,sp->startup_packet,len);
+ memcpy(sp_sort, sp->startup_packet, len);
p = sp_sort;
- p += sizeof(int); /* skip protocol version info */
+ p += sizeof(int); /* skip protocol version info */
/* count the number of options */
while (*p)
{
- p += (strlen(p) + 1); /* skip option name */
- p += (strlen(p) + 1); /* skip option value */
- opt_num ++;
+ p += (strlen(p) + 1); /* skip option name */
+ p += (strlen(p) + 1); /* skip option value */
+ opt_num++;
}
- guc_options = (char **)palloc0(opt_num * sizeof(char *));
+ guc_options = (char **) palloc0(opt_num * sizeof(char *));
/* get guc_option name list */
p = sp_sort + sizeof(int);
for (i = 0; i < opt_num; i++)
{
guc_options[i] = p;
- p += (strlen(p) + 1); /* skip option name */
- p += (strlen(p) + 1); /* skip option value */
+ p += (strlen(p) + 1); /* skip option name */
+ p += (strlen(p) + 1); /* skip option value */
}
/* sort option name using quick sort */
- qsort( (void *)guc_options, opt_num, sizeof(char *), opt_sort );
+ qsort((void *) guc_options, opt_num, sizeof(char *), opt_sort);
- p = sp->startup_packet + sizeof(int); /* skip protocol version info */
+ p = sp->startup_packet + sizeof(int); /* skip protocol version
+ * info */
for (i = 0; i < opt_num; i++)
{
tmpopt = guc_options[i];
- memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option name */
+ memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option name */
p += (strlen(tmpopt) + 1);
tmpopt += (strlen(tmpopt) + 1);
- memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option value */
+ memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option value */
p += (strlen(tmpopt) + 1);
}
@@ -731,7 +736,7 @@ read_startup_packet(POOL_CONNECTION * cp)
{
ereport(DEBUG1,
(errmsg("reading startup packet"),
- errdetail("guc name: %s value: %s", p, p+strlen(p)+1)));
+ errdetail("guc name: %s value: %s", p, p + strlen(p) + 1)));
p += (strlen(p) + 1);
}
@@ -787,8 +792,8 @@ read_startup_packet(POOL_CONNECTION * cp)
* Reuse existing connection
*/
static bool
-connect_using_existing_connection(POOL_CONNECTION * frontend,
- POOL_CONNECTION_POOL * backend,
+connect_using_existing_connection(POOL_CONNECTION *frontend,
+ POOL_CONNECTION_POOL *backend,
StartupPacket *sp)
{
int i,
@@ -822,8 +827,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend,
/* Reuse existing connection to backend */
frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
pool_do_reauth(frontend, backend);
@@ -861,7 +866,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend,
do_command(frontend, CONNECTION(backend, i),
command_buf, MAJOR(backend),
MAIN_CONNECTION(backend)->pid,
- MAIN_CONNECTION(backend)->key, 0);
+ MAIN_CONNECTION(backend)->key,
+ MAIN_CONNECTION(backend)->keylen, 0);
}
PG_CATCH();
{
@@ -902,7 +908,7 @@ connect_using_existing_connection(POOL_CONNECTION * frontend,
* process cancel request
*/
void
-cancel_request(CancelPacket * sp)
+cancel_request(CancelPacket *sp, int32 splen)
{
int len;
int fd;
@@ -911,8 +917,8 @@ cancel_request(CancelPacket * sp)
j,
k;
ConnectionInfo *c = NULL;
- CancelPacket cp;
bool found = false;
+ int32 keylen; /* cancel key length */
if (pool_config->log_client_messages)
ereport(LOG,
@@ -921,7 +927,20 @@ cancel_request(CancelPacket * sp)
ereport(DEBUG1,
(errmsg("Cancel request received")));
- /* look for cancel key from shmem info */
+ /*
+ * Cancel key length is cancel message length - cancel request code -
+ * process id.
+ */
+ keylen = splen - sizeof(int32) - sizeof(int32);
+
+ /*
+ * Look for cancel key from shmem info. Frontend should have saved one of
+ * cancel key among backend groups and sent it in the cancel request
+ * message. We are looking for the backend which has the same cancel key
+ * and pid. The query we want to cancel should have been running one the
+ * backend group. So some of query cancel requests may not work but it
+ * should not be a problem. They are just ignored by the backend.
+ */
for (i = 0; i < pool_config->num_init_children; i++)
{
for (j = 0; j < pool_config->max_pool; j++)
@@ -931,14 +950,20 @@ cancel_request(CancelPacket * sp)
c = pool_coninfo(i, j, k);
ereport(DEBUG2,
(errmsg("processing cancel request"),
- errdetail("connection info: address:%p database:%s user:%s pid:%d key:%d i:%d",
- c, c->database, c->user, ntohl(c->pid), ntohl(c->key), i)));
- if (c->pid == sp->pid && c->key == sp->key)
+ errdetail("connection info: address:%p database:%s user:%s pid:%d sp.pid:%d keylen:%d sp.keylen:%d i:%d",
+ c, c->database, c->user, ntohl(c->pid), ntohl(sp->pid),
+ c->keylen, keylen, i)));
+ if (c->pid == sp->pid && c->keylen == keylen &&
+ memcmp(c->key, sp->key, keylen) == 0)
{
ereport(DEBUG1,
(errmsg("processing cancel request"),
- errdetail("found pid:%d key:%d i:%d", ntohl(c->pid), ntohl(c->key), i)));
+ errdetail("found pid:%d keylen:%d i:%d", ntohl(c->pid), c->keylen, i)));
+ /*
+ * "c" is a pointer to i th child, j th pool, and 0 th
+ * backend.
+ */
c = pool_coninfo(i, j, 0);
found = true;
goto found;
@@ -951,12 +976,19 @@ found:
if (!found)
{
ereport(LOG,
- (errmsg("invalid cancel key: pid:%d key:%d", ntohl(sp->pid), ntohl(sp->key))));
+ (errmsg("invalid cancel key: pid:%d keylen:%d", ntohl(sp->pid), keylen)));
return; /* invalid key */
}
+ /*
+ * We are sending cancel request message to all backend groups. So some
+ * of query cancel requests may not work but it should not be a problem.
+ * They are just ignored by the backend.
+ */
for (i = 0; i < NUM_BACKENDS; i++, c++)
{
+ int32 cancel_request_code;
+
if (!VALID_BACKEND(i))
continue;
@@ -978,18 +1010,19 @@ found:
pool_set_db_node_id(con, i);
- len = htonl(sizeof(len) + sizeof(CancelPacket));
- pool_write(con, &len, sizeof(len));
-
- cp.protoVersion = sp->protoVersion;
- cp.pid = c->pid;
- cp.key = c->key;
+ len = htonl(splen + sizeof(int32)); /* splen does not include packet
+ * length field */
+ pool_write(con, &len, sizeof(len)); /* send cancel messages length */
+ cancel_request_code = htonl(PG_PROTOCOL(1234, 5678)); /* cancel request code */
+ pool_write(con, &cancel_request_code, sizeof(int32));
+ pool_write(con, &c->pid, sizeof(int32)); /* send pid */
+ pool_write(con, c->key, keylen); /* send cancel key */
ereport(LOG,
- (errmsg("forwarding cancel request to backend"),
- errdetail("canceling backend pid:%d key: %d", ntohl(cp.pid), ntohl(cp.key))));
+ (errmsg("forwarding cancel request to backend %d", i),
+ errdetail("canceling backend pid: %d keylen: %d", ntohl(sp->pid), keylen)));
- if (pool_write_and_flush_noerror(con, &cp, sizeof(CancelPacket)) < 0)
+ if (pool_flush_noerror(con) < 0)
ereport(WARNING,
(errmsg("failed to send cancel request to backend %d", i)));
@@ -1040,11 +1073,12 @@ StartupPacketCopy(StartupPacket *sp)
* Create a new connection to backend.
* Authentication is performed if requested by backend.
*/
-static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend)
+static POOL_CONNECTION_POOL *
+connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend)
{
POOL_CONNECTION_POOL *backend;
StartupPacket *volatile topmem_sp = NULL;
- volatile bool topmem_sp_set = false;
+ volatile bool topmem_sp_set = false;
int i;
/* connect to the backend */
@@ -1094,8 +1128,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION
* do authentication stuff
*/
frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
/* do authentication against backend */
@@ -1113,7 +1147,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION
}
PG_END_TRY();
- /* At this point, we need to free previously allocated memory for the
+ /*
+ * At this point, we need to free previously allocated memory for the
* startup packet if no backend is up.
*/
if (!topmem_sp_set && topmem_sp != NULL)
@@ -1216,7 +1251,7 @@ static RETSIGTYPE close_idle_connection(int sig)
if (CONNECTION_SLOT(p, main_node_id)->closetime > 0) /* idle connection? */
{
- bool freed = false;
+ bool freed = false;
pool_send_frontend_exits(p);
@@ -1285,7 +1320,7 @@ disable_authentication_timeout(void)
* Send parameter status message to frontend.
*/
static void
-send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
+send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
{
int index;
char *name,
@@ -1335,7 +1370,7 @@ child_will_go_down(int code, Datum arg)
if (child_frontend)
log_disconnections(child_frontend->database, child_frontend->username);
else
- log_disconnections("","");
+ log_disconnections("", "");
}
}
@@ -1566,7 +1601,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
}
else
{
- int sts;
+ int sts;
for (;;)
{
@@ -1581,10 +1616,10 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
if (backend_timer_expired)
{
/*
- * We add 10 seconds to connection_life_time so that there's
- * enough margin.
+ * We add 10 seconds to connection_life_time so that
+ * there's enough margin.
*/
- int seconds = pool_config->connection_life_time + 10;
+ int seconds = pool_config->connection_life_time + 10;
while (seconds-- > 0)
{
@@ -1599,7 +1634,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
}
}
}
- else /* success or other error */
+ else /* success or other error */
break;
}
}
@@ -1633,7 +1668,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr)
numfds = select(nsocks, &rmask, NULL, NULL, timeout);
- /* not timeout*/
+ /* not timeout */
if (numfds != 0)
break;
@@ -1768,9 +1803,10 @@ retry_accept:
}
static bool
-unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt)
+unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt)
{
- int i;
+ int i;
+
for (i = 0; i < num_unix_fds; i++)
{
if (!FD_ISSET(fds[i], opt))
@@ -1872,7 +1908,7 @@ validate_backend_connectivity(int front_end_fd)
error_hint,
__FILE__,
__LINE__);
-
+
}
PG_CATCH();
{
@@ -1898,7 +1934,7 @@ static POOL_CONNECTION *
get_connection(int front_end_fd, SockAddr *saddr)
{
POOL_CONNECTION *cp;
- ProcessInfo *pi;
+ ProcessInfo *pi;
ereport(DEBUG1,
(errmsg("I am %d accept fd %d", getpid(), front_end_fd)));
@@ -1965,7 +2001,7 @@ get_connection(int front_end_fd, SockAddr *saddr)
* pgpool <--> backend.
*/
static POOL_CONNECTION_POOL *
-get_backend_connection(POOL_CONNECTION * frontend)
+get_backend_connection(POOL_CONNECTION *frontend)
{
int found = 0;
StartupPacket *sp;
@@ -1978,7 +2014,7 @@ retry_startup:
/* cancel request? */
if (sp->major == 1234 && sp->minor == 5678)
{
- cancel_request((CancelPacket *) sp->startup_packet);
+ cancel_request((CancelPacket *) sp->startup_packet, sp->len);
pool_free_startup_packet(sp);
connection_count_down();
return NULL;
@@ -2021,8 +2057,8 @@ retry_startup:
* return if frontend was rejected; it simply terminates this process.
*/
MemoryContext frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext,
- "frontend_auth",
- ALLOCSET_DEFAULT_SIZES);
+ "frontend_auth",
+ ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(frontend_auth_cxt);
/*
@@ -2105,8 +2141,8 @@ retry_startup:
if (backend == NULL)
{
/*
- * Create a new connection to backend.
- * Authentication is performed if requested by backend.
+ * Create a new connection to backend. Authentication is performed if
+ * requested by backend.
*/
backend = connect_backend(sp, frontend);
}
@@ -2125,7 +2161,7 @@ static void
log_disconnections(char *database, char *username)
{
struct timeval endTime;
- long diff;
+ long diff;
long secs;
int msecs,
hours,
@@ -2133,7 +2169,7 @@ log_disconnections(char *database, char *username)
seconds;
gettimeofday(&endTime, NULL);
- diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec));
+ diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec));
msecs = (int) (diff % 1000000) / 1000;
secs = (long) (diff / 1000000);
@@ -2205,9 +2241,10 @@ pg_frontend_exists(void)
return 0;
}
-static int opt_sort(const void *a, const void *b)
+static int
+opt_sort(const void *a, const void *b)
{
- return strcmp( *(char **)a, *(char **)b);
+ return strcmp(*(char **) a, *(char **) b);
}
void