diff options
Diffstat (limited to 'src/protocol/child.c')
-rw-r--r-- | src/protocol/child.c | 213 |
1 files changed, 125 insertions, 88 deletions
diff --git a/src/protocol/child.c b/src/protocol/child.c index 7aea33540..cf2161806 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -68,19 +68,19 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" -static StartupPacket *read_startup_packet(POOL_CONNECTION * cp); -static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend); +static StartupPacket *read_startup_packet(POOL_CONNECTION *cp); +static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend); static RETSIGTYPE die(int sig); static RETSIGTYPE close_idle_connection(int sig); static RETSIGTYPE wakeup_handler(int sig); static RETSIGTYPE reload_config_handler(int sig); static RETSIGTYPE authentication_timeout(int sig); -static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +static void send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int connection_count_up(void); static void connection_count_down(void); -static bool connect_using_existing_connection(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - StartupPacket *sp); +static bool connect_using_existing_connection(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + StartupPacket *sp); static void check_restart_request(void); static void check_exit_request(void); static void enable_authentication_timeout(void); @@ -89,17 +89,17 @@ static int wait_for_new_connections(int *fds, SockAddr *saddr); static void check_config_reload(void); static void get_backends_status(unsigned int *valid_backends, unsigned int *down_backends); static void validate_backend_connectivity(int front_end_fd); -static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr); -static POOL_CONNECTION_POOL * get_backend_connection(POOL_CONNECTION * frontend); +static POOL_CONNECTION *get_connection(int front_end_fd, SockAddr *saddr); +static POOL_CONNECTION_POOL *get_backend_connection(POOL_CONNECTION *frontend); static StartupPacket *StartupPacketCopy(StartupPacket *sp); static void log_disconnections(char *database, char *username); static void print_process_status(char *remote_host, char *remote_port); -static bool backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid); +static bool backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid); static void child_will_go_down(int code, Datum arg); -static int opt_sort(const void *a, const void *b); +static int opt_sort(const void *a, const void *b); -static bool unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt); +static bool unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt); /* * Non 0 means SIGTERM (smart shutdown) or SIGINT (fast shutdown) has arrived @@ -154,11 +154,12 @@ do_child(int *fds) sigjmp_buf local_sigjmp_buf; POOL_CONNECTION_POOL *volatile backend = NULL; - /* counter for child_max_connections. "volatile" declaration is necessary + /* + * counter for child_max_connections. "volatile" declaration is necessary * so that this is counted up even if long jump is issued due to * ereport(ERROR). */ - volatile int connections_count = 0; + volatile int connections_count = 0; char psbuf[NI_MAXHOST + 128]; @@ -355,7 +356,8 @@ do_child(int *fds) */ if (con_count > (pool_config->num_init_children - pool_config->reserved_connections)) { - POOL_CONNECTION * cp; + POOL_CONNECTION *cp; + cp = pool_open(front_end_fd, false); if (cp == NULL) { @@ -405,8 +407,8 @@ do_child(int *fds) pool_initialize_private_backend_status(); /* - * Connect to backend. Also do authentication between - * frontend <--> pgpool and pgpool <--> backend. + * Connect to backend. Also do authentication between frontend <--> + * pgpool and pgpool <--> backend. */ backend = get_backend_connection(child_frontend); if (!backend) @@ -513,7 +515,7 @@ do_child(int *fds) * return true if backend connection is cached */ static bool -backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid) +backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid) { StartupPacket *sp; bool cache_connection = false; @@ -600,18 +602,18 @@ backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * vol * Read the startup packet and parse the contents. */ static StartupPacket * -read_startup_packet(POOL_CONNECTION * cp) +read_startup_packet(POOL_CONNECTION *cp) { StartupPacket *sp; StartupPacket_v2 *sp2; int protov; int len; char *p; - char **guc_options; - int opt_num = 0; - char *sp_sort; - char *tmpopt; - int i; + char **guc_options; + int opt_num = 0; + char *sp_sort; + char *tmpopt; + int i; sp = (StartupPacket *) palloc0(sizeof(*sp)); @@ -624,14 +626,16 @@ read_startup_packet(POOL_CONNECTION * cp) len = ntohl(len); len -= sizeof(len); - if (len <= 0 || len >= MAX_STARTUP_PACKET_LENGTH) + if (len < 4 || len > MAX_STARTUP_PACKET_LENGTH) ereport(ERROR, (errmsg("failed while reading startup packet"), errdetail("incorrect packet length (%d)", len))); sp->startup_packet = palloc0(len); - /* read startup packet */ + /* + * Read startup packet except the length of the message. + */ pool_read_with_error(cp, sp->startup_packet, len, "startup packet"); @@ -657,37 +661,38 @@ read_startup_packet(POOL_CONNECTION * cp) case PROTO_MAJOR_V3: /* V3 */ /* copy startup_packet */ sp_sort = palloc0(len); - memcpy(sp_sort,sp->startup_packet,len); + memcpy(sp_sort, sp->startup_packet, len); p = sp_sort; - p += sizeof(int); /* skip protocol version info */ + p += sizeof(int); /* skip protocol version info */ /* count the number of options */ while (*p) { - p += (strlen(p) + 1); /* skip option name */ - p += (strlen(p) + 1); /* skip option value */ - opt_num ++; + p += (strlen(p) + 1); /* skip option name */ + p += (strlen(p) + 1); /* skip option value */ + opt_num++; } - guc_options = (char **)palloc0(opt_num * sizeof(char *)); + guc_options = (char **) palloc0(opt_num * sizeof(char *)); /* get guc_option name list */ p = sp_sort + sizeof(int); for (i = 0; i < opt_num; i++) { guc_options[i] = p; - p += (strlen(p) + 1); /* skip option name */ - p += (strlen(p) + 1); /* skip option value */ + p += (strlen(p) + 1); /* skip option name */ + p += (strlen(p) + 1); /* skip option value */ } /* sort option name using quick sort */ - qsort( (void *)guc_options, opt_num, sizeof(char *), opt_sort ); + qsort((void *) guc_options, opt_num, sizeof(char *), opt_sort); - p = sp->startup_packet + sizeof(int); /* skip protocol version info */ + p = sp->startup_packet + sizeof(int); /* skip protocol version + * info */ for (i = 0; i < opt_num; i++) { tmpopt = guc_options[i]; - memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option name */ + memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option name */ p += (strlen(tmpopt) + 1); tmpopt += (strlen(tmpopt) + 1); - memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option value */ + memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option value */ p += (strlen(tmpopt) + 1); } @@ -731,7 +736,7 @@ read_startup_packet(POOL_CONNECTION * cp) { ereport(DEBUG1, (errmsg("reading startup packet"), - errdetail("guc name: %s value: %s", p, p+strlen(p)+1))); + errdetail("guc name: %s value: %s", p, p + strlen(p) + 1))); p += (strlen(p) + 1); } @@ -787,8 +792,8 @@ read_startup_packet(POOL_CONNECTION * cp) * Reuse existing connection */ static bool -connect_using_existing_connection(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +connect_using_existing_connection(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, StartupPacket *sp) { int i, @@ -822,8 +827,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend, /* Reuse existing connection to backend */ frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); oldContext = MemoryContextSwitchTo(frontend_auth_cxt); pool_do_reauth(frontend, backend); @@ -861,7 +866,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend, do_command(frontend, CONNECTION(backend, i), command_buf, MAJOR(backend), MAIN_CONNECTION(backend)->pid, - MAIN_CONNECTION(backend)->key, 0); + MAIN_CONNECTION(backend)->key, + MAIN_CONNECTION(backend)->keylen, 0); } PG_CATCH(); { @@ -902,7 +908,7 @@ connect_using_existing_connection(POOL_CONNECTION * frontend, * process cancel request */ void -cancel_request(CancelPacket * sp) +cancel_request(CancelPacket *sp, int32 splen) { int len; int fd; @@ -911,8 +917,8 @@ cancel_request(CancelPacket * sp) j, k; ConnectionInfo *c = NULL; - CancelPacket cp; bool found = false; + int32 keylen; /* cancel key length */ if (pool_config->log_client_messages) ereport(LOG, @@ -921,7 +927,20 @@ cancel_request(CancelPacket * sp) ereport(DEBUG1, (errmsg("Cancel request received"))); - /* look for cancel key from shmem info */ + /* + * Cancel key length is cancel message length - cancel request code - + * process id. + */ + keylen = splen - sizeof(int32) - sizeof(int32); + + /* + * Look for cancel key from shmem info. Frontend should have saved one of + * cancel key among backend groups and sent it in the cancel request + * message. We are looking for the backend which has the same cancel key + * and pid. The query we want to cancel should have been running one the + * backend group. So some of query cancel requests may not work but it + * should not be a problem. They are just ignored by the backend. + */ for (i = 0; i < pool_config->num_init_children; i++) { for (j = 0; j < pool_config->max_pool; j++) @@ -931,14 +950,20 @@ cancel_request(CancelPacket * sp) c = pool_coninfo(i, j, k); ereport(DEBUG2, (errmsg("processing cancel request"), - errdetail("connection info: address:%p database:%s user:%s pid:%d key:%d i:%d", - c, c->database, c->user, ntohl(c->pid), ntohl(c->key), i))); - if (c->pid == sp->pid && c->key == sp->key) + errdetail("connection info: address:%p database:%s user:%s pid:%d sp.pid:%d keylen:%d sp.keylen:%d i:%d", + c, c->database, c->user, ntohl(c->pid), ntohl(sp->pid), + c->keylen, keylen, i))); + if (c->pid == sp->pid && c->keylen == keylen && + memcmp(c->key, sp->key, keylen) == 0) { ereport(DEBUG1, (errmsg("processing cancel request"), - errdetail("found pid:%d key:%d i:%d", ntohl(c->pid), ntohl(c->key), i))); + errdetail("found pid:%d keylen:%d i:%d", ntohl(c->pid), c->keylen, i))); + /* + * "c" is a pointer to i th child, j th pool, and 0 th + * backend. + */ c = pool_coninfo(i, j, 0); found = true; goto found; @@ -951,12 +976,19 @@ found: if (!found) { ereport(LOG, - (errmsg("invalid cancel key: pid:%d key:%d", ntohl(sp->pid), ntohl(sp->key)))); + (errmsg("invalid cancel key: pid:%d keylen:%d", ntohl(sp->pid), keylen))); return; /* invalid key */ } + /* + * We are sending cancel request message to all backend groups. So some + * of query cancel requests may not work but it should not be a problem. + * They are just ignored by the backend. + */ for (i = 0; i < NUM_BACKENDS; i++, c++) { + int32 cancel_request_code; + if (!VALID_BACKEND(i)) continue; @@ -978,18 +1010,19 @@ found: pool_set_db_node_id(con, i); - len = htonl(sizeof(len) + sizeof(CancelPacket)); - pool_write(con, &len, sizeof(len)); - - cp.protoVersion = sp->protoVersion; - cp.pid = c->pid; - cp.key = c->key; + len = htonl(splen + sizeof(int32)); /* splen does not include packet + * length field */ + pool_write(con, &len, sizeof(len)); /* send cancel messages length */ + cancel_request_code = htonl(PG_PROTOCOL(1234, 5678)); /* cancel request code */ + pool_write(con, &cancel_request_code, sizeof(int32)); + pool_write(con, &c->pid, sizeof(int32)); /* send pid */ + pool_write(con, c->key, keylen); /* send cancel key */ ereport(LOG, - (errmsg("forwarding cancel request to backend"), - errdetail("canceling backend pid:%d key: %d", ntohl(cp.pid), ntohl(cp.key)))); + (errmsg("forwarding cancel request to backend %d", i), + errdetail("canceling backend pid: %d keylen: %d", ntohl(sp->pid), keylen))); - if (pool_write_and_flush_noerror(con, &cp, sizeof(CancelPacket)) < 0) + if (pool_flush_noerror(con) < 0) ereport(WARNING, (errmsg("failed to send cancel request to backend %d", i))); @@ -1040,11 +1073,12 @@ StartupPacketCopy(StartupPacket *sp) * Create a new connection to backend. * Authentication is performed if requested by backend. */ -static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend) +static POOL_CONNECTION_POOL * +connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend) { POOL_CONNECTION_POOL *backend; StartupPacket *volatile topmem_sp = NULL; - volatile bool topmem_sp_set = false; + volatile bool topmem_sp_set = false; int i; /* connect to the backend */ @@ -1094,8 +1128,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * do authentication stuff */ frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); oldContext = MemoryContextSwitchTo(frontend_auth_cxt); /* do authentication against backend */ @@ -1113,7 +1147,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION } PG_END_TRY(); - /* At this point, we need to free previously allocated memory for the + /* + * At this point, we need to free previously allocated memory for the * startup packet if no backend is up. */ if (!topmem_sp_set && topmem_sp != NULL) @@ -1216,7 +1251,7 @@ static RETSIGTYPE close_idle_connection(int sig) if (CONNECTION_SLOT(p, main_node_id)->closetime > 0) /* idle connection? */ { - bool freed = false; + bool freed = false; pool_send_frontend_exits(p); @@ -1285,7 +1320,7 @@ disable_authentication_timeout(void) * Send parameter status message to frontend. */ static void -send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int index; char *name, @@ -1335,7 +1370,7 @@ child_will_go_down(int code, Datum arg) if (child_frontend) log_disconnections(child_frontend->database, child_frontend->username); else - log_disconnections("",""); + log_disconnections("", ""); } } @@ -1566,7 +1601,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) } else { - int sts; + int sts; for (;;) { @@ -1581,10 +1616,10 @@ wait_for_new_connections(int *fds, SockAddr *saddr) if (backend_timer_expired) { /* - * We add 10 seconds to connection_life_time so that there's - * enough margin. + * We add 10 seconds to connection_life_time so that + * there's enough margin. */ - int seconds = pool_config->connection_life_time + 10; + int seconds = pool_config->connection_life_time + 10; while (seconds-- > 0) { @@ -1599,7 +1634,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) } } } - else /* success or other error */ + else /* success or other error */ break; } } @@ -1633,7 +1668,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) numfds = select(nsocks, &rmask, NULL, NULL, timeout); - /* not timeout*/ + /* not timeout */ if (numfds != 0) break; @@ -1768,9 +1803,10 @@ retry_accept: } static bool -unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt) +unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt) { - int i; + int i; + for (i = 0; i < num_unix_fds; i++) { if (!FD_ISSET(fds[i], opt)) @@ -1872,7 +1908,7 @@ validate_backend_connectivity(int front_end_fd) error_hint, __FILE__, __LINE__); - + } PG_CATCH(); { @@ -1898,7 +1934,7 @@ static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr) { POOL_CONNECTION *cp; - ProcessInfo *pi; + ProcessInfo *pi; ereport(DEBUG1, (errmsg("I am %d accept fd %d", getpid(), front_end_fd))); @@ -1965,7 +2001,7 @@ get_connection(int front_end_fd, SockAddr *saddr) * pgpool <--> backend. */ static POOL_CONNECTION_POOL * -get_backend_connection(POOL_CONNECTION * frontend) +get_backend_connection(POOL_CONNECTION *frontend) { int found = 0; StartupPacket *sp; @@ -1978,7 +2014,7 @@ retry_startup: /* cancel request? */ if (sp->major == 1234 && sp->minor == 5678) { - cancel_request((CancelPacket *) sp->startup_packet); + cancel_request((CancelPacket *) sp->startup_packet, sp->len); pool_free_startup_packet(sp); connection_count_down(); return NULL; @@ -2021,8 +2057,8 @@ retry_startup: * return if frontend was rejected; it simply terminates this process. */ MemoryContext frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(frontend_auth_cxt); /* @@ -2105,8 +2141,8 @@ retry_startup: if (backend == NULL) { /* - * Create a new connection to backend. - * Authentication is performed if requested by backend. + * Create a new connection to backend. Authentication is performed if + * requested by backend. */ backend = connect_backend(sp, frontend); } @@ -2125,7 +2161,7 @@ static void log_disconnections(char *database, char *username) { struct timeval endTime; - long diff; + long diff; long secs; int msecs, hours, @@ -2133,7 +2169,7 @@ log_disconnections(char *database, char *username) seconds; gettimeofday(&endTime, NULL); - diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec)); + diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec)); msecs = (int) (diff % 1000000) / 1000; secs = (long) (diff / 1000000); @@ -2205,9 +2241,10 @@ pg_frontend_exists(void) return 0; } -static int opt_sort(const void *a, const void *b) +static int +opt_sort(const void *a, const void *b) { - return strcmp( *(char **)a, *(char **)b); + return strcmp(*(char **) a, *(char **) b); } void |