diff options
author | Tatsuo Ishii | 2024-08-27 01:59:37 +0000 |
---|---|---|
committer | Tatsuo Ishii | 2024-08-27 01:59:37 +0000 |
commit | 65dbbe7a0fda8d2b5cc2d41b071e2ffeab751db9 (patch) | |
tree | be18e2cea24e758071027e82e4c0e8090b3a775c /src | |
parent | 72707f07a65a63178a161ff42d25ba31c202e05a (diff) |
Add IPv6 support for hostname and heartbeat_hostname parameter.
Now these watchdog configuration parameters accept IPv6 IP address.
Author: Kwangwon Seo
Reviewed-by: Muhammad Usama, Tatsuo Ishii
Discussion: [pgpool-hackers: 4476] Watchdog and IPv6
https://www.pgpool.net/pipermail/pgpool-hackers/2024-July/004477.html
Diffstat (limited to 'src')
-rw-r--r-- | src/watchdog/watchdog.c | 322 | ||||
-rw-r--r-- | src/watchdog/wd_heartbeat.c | 388 |
2 files changed, 498 insertions, 212 deletions
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index 41b53d26d..f88b9795c 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -585,7 +585,7 @@ static bool get_authhash_for_node(WatchdogNode * wdNode, char *authhash); static bool verify_authhash_for_node(WatchdogNode * wdNode, char *authhash); static void print_watchdog_node_info(WatchdogNode * wdNode); -static int wd_create_recv_socket(int port); +static List *wd_create_recv_socket(int port); static void wd_check_config(void); static pid_t watchdog_main(void); static pid_t fork_watchdog_child(void); @@ -614,6 +614,7 @@ static void update_failover_timeout(WatchdogNode * wdNode, POOL_CONFIG *pool_con wd_cluster g_cluster; struct timeval g_tm_set_time; int g_timeout_sec = 0; +List *g_wd_recv_socks = NIL; static unsigned int get_next_commandID(void) @@ -823,7 +824,8 @@ wd_cluster_initialize(void) g_cluster.ipc_auth_needed = strlen(pool_config->wd_authkey) ? true : false; g_cluster.localNode->escalated = get_watchdog_node_escalation_state(); - + g_cluster.localNode->server_socket.sock = 0; + g_cluster.localNode->server_socket.sock_state = WD_SOCK_CLOSED; wd_initialize_monitoring_interfaces(); if (g_cluster.ipc_auth_needed) { @@ -855,82 +857,132 @@ clear_command_node_result(WDCommandNodeResult * nodeResult) nodeResult->cmdState = COMMAND_STATE_INIT; } -static int +static List * wd_create_recv_socket(int port) { - size_t len = 0; - struct sockaddr_in addr; int one = 1; int sock = -1; int saved_errno; + int gai_ret, + n = 0, + target_n = n; + char *portstr = NULL; + struct addrinfo hints, + *walk, + *res = NULL; + List *socks = NIL; + + portstr = psprintf("%d", port); - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV | AI_PASSIVE; + + if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0) { - /* socket create failed */ ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("create socket failed with reason: \"%m\""))); + (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret)))); + pfree(portstr); + return NIL; } + pfree(portstr); - socket_set_nonblock(sock); + for (walk = res; walk != NULL; walk = walk->ai_next) + n++; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + if (n == 0) { - /* setsockopt(SO_REUSEADDR) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno)))); + ereport(ERROR, (errmsg("failed to create watchdog receive socket"), + errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", port))); + return NIL; } - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1) + else { - /* setsockopt(TCP_NODELAY) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno)))); + target_n = n; + n = 0; } - if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1) + + for (walk = res; walk != NULL; walk = walk->ai_next) { - /* setsockopt(SO_KEEPALIVE) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno)))); - } + if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0) + { + /* socket create failed */ + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("create socket failed with reason: \"%m\""))); + } - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(port); - len = sizeof(struct sockaddr_in); + socket_set_nonblock(sock); - if (bind(sock, (struct sockaddr *) &addr, len) < 0) - { - /* bind failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno)))); - } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(SO_REUSEADDR) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(TCP_NODELAY) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(SO_KEEPALIVE) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (walk->ai_family == AF_INET6) + { + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1) + { + ereport(ERROR, + (errmsg("failed to set IPPROTO_IPV6 option to watchdog receive socket"), + errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\""))); + } + } + if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0) + { + /* bind failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno)))); + } - if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0) - { - /* listen failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("listen failed with reason: \"%s\"", strerror(saved_errno)))); - } + if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0) + { + /* listen failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("listen failed with reason: \"%s\"", strerror(saved_errno)))); + } - return sock; -} + socks = lappend_int(socks, sock); + n++; + } + if (target_n != n) + ereport(WARNING, + (errmsg("failed to create watchdog receive socket as much intended"), + errdetail("only %d out of %d socket(s) had been created", n, target_n))); + return socks; +} /* * creates a socket in non blocking mode and connects it to the hostname and port @@ -939,15 +991,33 @@ wd_create_recv_socket(int port) static int wd_create_client_socket(char *hostname, int port, bool *connected) { - int sock; + int sock, + gai_ret = -1; int one = 1; - size_t len = 0; - struct sockaddr_in addr; - struct hostent *hp; + char *portstr = NULL; + struct addrinfo hints, + *res = NULL; + portstr = psprintf("%d", port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((gai_ret = getaddrinfo(hostname, portstr, &hints, &res)) != 0) + { + ereport(ERROR, + (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret)))); + + pfree(portstr); + return -1; + } + pfree(portstr); *connected = false; /* create socket */ - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) { /* socket create failed */ ereport(LOG, @@ -972,31 +1042,13 @@ wd_create_client_socket(char *hostname, int port, bool *connected) close(sock); return -1; } - /* set sockaddr_in */ - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - hp = gethostbyname(hostname); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - { - hp = gethostbyaddr(hostname, strlen(hostname), AF_INET); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - { - ereport(LOG, - (errmsg("failed to get host address for \"%s\"", hostname), - errdetail("gethostbyaddr failed with error: \"%s\"", hstrerror(h_errno)))); - close(sock); - return -1; - } - } - memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length); - addr.sin_port = htons(port); - len = sizeof(struct sockaddr_in); /* set socket to non blocking */ socket_set_nonblock(sock); - if (connect(sock, (struct sockaddr *) &addr, len) < 0) + if (connect(sock, res->ai_addr, res->ai_addrlen) < 0) { + freeaddrinfo(res); if (errno == EINPROGRESS) { return sock; @@ -1013,6 +1065,7 @@ wd_create_client_socket(char *hostname, int port, bool *connected) close(sock); return -1; } + freeaddrinfo(res); /* set socket to blocking again */ socket_unset_nonblock(sock); *connected = true; @@ -1191,8 +1244,8 @@ watchdog_main(void) /* initialize all the local structures for watchdog */ wd_cluster_initialize(); /* create a server socket for incoming watchdog connections */ - g_cluster.localNode->server_socket.sock = wd_create_recv_socket(g_cluster.localNode->wd_port); - g_cluster.localNode->server_socket.sock_state = WD_SOCK_CONNECTED; + g_wd_recv_socks = wd_create_recv_socket(g_cluster.localNode->wd_port); + /* open the command server */ g_cluster.command_server_sock = wd_create_command_server_socket(); @@ -1406,9 +1459,16 @@ prepare_fds(fd_set *rmask, fd_set *wmask, fd_set *emask) FD_ZERO(wmask); FD_ZERO(emask); - /* local node server socket will set the read and exception fds */ - FD_SET(g_cluster.localNode->server_socket.sock, rmask); - FD_SET(g_cluster.localNode->server_socket.sock, emask); + foreach(lc, g_wd_recv_socks) + { + i = lfirst_int(lc); + if (fd_max < i) + fd_max = i; + + /* local node server socket will set the read and exception fds */ + FD_SET(i, rmask); + FD_SET(i, emask); + } /* command server socket will set the read and exception fds */ FD_SET(g_cluster.command_server_sock, rmask); @@ -3299,6 +3359,7 @@ static void wd_system_will_go_down(int code, Datum arg) { int i; + ListCell *lc; ereport(LOG, (errmsg("Watchdog is shutting down"))); @@ -3307,8 +3368,16 @@ wd_system_will_go_down(int code, Datum arg) if (get_local_node_state() == WD_COORDINATOR) resign_from_escalated_node(); - /* close server socket */ - close_socket_connection(&g_cluster.localNode->server_socket); + + /* close watchdog receive sockets */ + foreach(lc, g_wd_recv_socks) + { + i = lfirst_int(lc); + close(i); + } + list_free(g_wd_recv_socks); + g_wd_recv_socks = NIL; + /* close all node sockets */ for (i = 0; i < g_cluster.remoteNodeCount; i++) { @@ -3382,39 +3451,62 @@ accept_incoming_connections(fd_set *rmask, int pending_fds_count) { int processed_fds = 0; int fd; + ListCell *lc; + int sock; - if (FD_ISSET(g_cluster.localNode->server_socket.sock, rmask)) + foreach(lc, g_wd_recv_socks) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(struct sockaddr_in); - - processed_fds++; - fd = accept(g_cluster.localNode->server_socket.sock, (struct sockaddr *) &addr, &addrlen); - if (fd < 0) + sock = lfirst_int(lc); + if (FD_ISSET(sock, rmask)) { - if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK) + struct sockaddr_storage ss; + socklen_t addrlen = sizeof(ss); + int port; + + processed_fds++; + fd = accept(sock, (struct sockaddr *) &ss, &addrlen); + if (fd < 0) { - /* nothing to accept now */ - ereport(DEBUG2, - (errmsg("Failed to accept incoming watchdog connection, Nothing to accept"))); + if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK) + { + /* nothing to accept now */ + ereport(DEBUG2, + (errmsg("Failed to accept incoming watchdog connection, Nothing to accept"))); + } + /* accept failed */ + ereport(DEBUG1, + (errmsg("Failed to accept incoming watchdog connection"))); } - /* accept failed */ - ereport(DEBUG1, - (errmsg("Failed to accept incoming watchdog connection"))); - } - else - { - MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext); - SocketConnection *conn = palloc(sizeof(SocketConnection)); + else + { + MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext); + SocketConnection *conn = palloc(sizeof(SocketConnection)); - conn->sock = fd; - conn->sock_state = WD_SOCK_CONNECTED; - gettimeofday(&conn->tv, NULL); - strncpy(conn->addr, inet_ntoa(addr.sin_addr), sizeof(conn->addr) - 1); - ereport(LOG, - (errmsg("new watchdog node connection is received from \"%s:%d\"", inet_ntoa(addr.sin_addr), addr.sin_port))); - g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn); - MemoryContextSwitchTo(oldCxt); + conn->sock = fd; + conn->sock_state = WD_SOCK_CONNECTED; + gettimeofday(&conn->tv, NULL); + + switch (ss.ss_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, conn->addr, addrlen); + port = ntohs(((struct sockaddr_in *) (&ss))->sin_port); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, conn->addr, addrlen); + port = ntohs(((struct sockaddr_in6 *) (&ss))->sin6_port); + break; + default: + ereport(ERROR, (errmsg("invalid incoming socket family data"))); + break; + } + + ereport(LOG, + (errmsg("new watchdog node connection is received from \"%s:%d\"", conn->addr, port))); + + g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn); + MemoryContextSwitchTo(oldCxt); + } } } diff --git a/src/watchdog/wd_heartbeat.c b/src/watchdog/wd_heartbeat.c index d9ace72ec..67dcf505d 100644 --- a/src/watchdog/wd_heartbeat.c +++ b/src/watchdog/wd_heartbeat.c @@ -75,28 +75,117 @@ static int hton_wd_hb_packet(WdHbPacket * to, WdHbPacket * from); static int ntoh_wd_hb_packet(WdHbPacket * to, WdHbPacket * from); static int packet_to_string_hb(WdHbPacket * pkt, char *str, int maxlen); static void wd_set_reuseport(int sock); +static int select_socket_from_list(List *socks, int timeout_sec); static int wd_create_hb_send_socket(WdHbIf * hb_if); -static int wd_create_hb_recv_socket(WdHbIf * hb_if); +static List *wd_create_hb_recv_socket(WdHbIf * hb_if); static void wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *destination, const int dest_port); static void wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr); + /* + * Readable socket will be returned among the listening socket list. + */ +static int +select_socket_from_list(List *socks, int timeout_sec) +{ + int select_ret; + int maxsfd = 0; + int sock; + fd_set rfds; + fd_set efds; + ListCell *lc; + struct timeval tv; + + tv.tv_sec = timeout_sec; + tv.tv_usec = 0; + + FD_ZERO(&rfds); + FD_ZERO(&efds); + + foreach(lc, socks) + { + sock = lfirst_int(lc); + FD_SET(sock, &rfds); + FD_SET(sock, &efds); + if (maxsfd < sock) + maxsfd = sock; + } + + select_ret = select(maxsfd + 1, &rfds, NULL, &efds, &tv); + + if (select_ret > 0) + { + foreach(lc, socks) + { + sock = lfirst_int(lc); + if (FD_ISSET(sock, &rfds)) + { + ereport(DEBUG2, + (errmsg("wd_hb_recv_socket fd:%d is readable", sock))); + return sock; + } + if (FD_ISSET(sock, &efds)) + { + ereport(WARNING, + (errmsg("exception detected on heartbeat receive socket fd:%d", sock))); + break; + } + } + } + else if (select_ret == -1) + { + ereport(ERROR, + (errmsg("failed to get socket data from heartbeat receive socket list"), + errdetail("select() failed with reason %m"))); + } + else + { + ereport(ERROR, + (errmsg("failed to get socket data from heartbeat receive socket list"), + errdetail("select() got timeout, exceed %d sec(s)", timeout_sec))); + } + + return -1; +} + /* create socket for sending heartbeat */ static int wd_create_hb_send_socket(WdHbIf * hb_if) { - int sock; + int sock = -1; int tos; + int ret; + char *portstr = NULL; + struct addrinfo hints, + *res = NULL; + + portstr = psprintf("%d", hb_if->dest_port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((ret = getaddrinfo(hb_if->addr, portstr, &hints, &res)) != 0) + { + ereport(ERROR, + (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(ret)))); + pfree(portstr); + return -1; + } + pfree(portstr); /* create socket */ - if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) { /* socket create failed */ ereport(ERROR, (errmsg("failed to create watchdog heartbeat sender socket"), errdetail("create socket failed with reason: \"%m\""))); } + freeaddrinfo(res); /* set socket option */ tos = IPTOS_LOWDELAY; @@ -159,112 +248,182 @@ wd_create_hb_send_socket(WdHbIf * hb_if) } /* create socket for receiving heartbeat */ -static int +static List * wd_create_hb_recv_socket(WdHbIf * hb_if) { - struct sockaddr_in addr; - int sock; + int sock = -1, + gai_ret, + n = 0, + target_n = n; + List *socks = NIL; const int one = 1; int bind_tries; int bind_is_done; + char buf[INET6_ADDRSTRLEN] = {'\0'}; + char *portstr = NULL; + struct addrinfo hints, + *walk, + *res = NULL; - memset(&(addr), 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(pool_config->wd_heartbeat_port); - addr.sin_addr.s_addr = INADDR_ANY; + portstr = psprintf("%d", pool_config->wd_heartbeat_port); - /* create socket */ - if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0) { - /* socket create failed */ ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("create socket failed with reason: \"%m\""))); + (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(gai_ret)))); + pfree(portstr); + return NIL; } + pfree(portstr); + + for (walk = res; walk != NULL; walk = walk->ai_next) + n++; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + if (n == 0) { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\""))); + ereport(ERROR, (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", pool_config->wd_heartbeat_port))); + return NULL; + } + else + { + target_n = n; + n = 0; } - if (hb_if->if_name[0] != '\0') + + for (walk = res; walk != NULL; walk = walk->ai_next) { -#if defined(SO_BINDTODEVICE) + /* create socket */ + if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0) { - if (geteuid() == 0) /* check root privileges */ - { - struct ifreq i; + /* socket create failed */ + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("create socket failed with reason: \"%m\""))); + } - strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name)); + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\""))); + } + if (walk->ai_family == AF_INET6) + { + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1) + { + ereport(ERROR, + (errmsg("failed to set IPPROTO_IPV6 option to watchdog heartbeat recv socket"), + errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\""))); + } + } - if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1) + if (hb_if->if_name[0] != '\0') + { +#if defined(SO_BINDTODEVICE) + { + if (geteuid() == 0) /* check root privileges */ { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\""))); - } - ereport(LOG, - (errmsg("creating watchdog heartbeat receive socket."), - errdetail("bind receive socket to device: \"%s\"", i.ifr_name))); + struct ifreq i; + + strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name)); + + if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\""))); + } + ereport(LOG, + (errmsg("creating watchdog heartbeat receive socket."), + errdetail("bind receive socket to device: \"%s\"", i.ifr_name))); + } + else + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket."), + errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege"))); } - else - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket."), - errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege"))); - } #else - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform"))); + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform"))); #endif - } + } - wd_set_reuseport(sock); + wd_set_reuseport(sock); - ereport(LOG, - (errmsg("creating watchdog heartbeat receive socket."), - errdetail("set SO_REUSEPORT"))); + switch (walk->ai_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) walk->ai_addr)->sin_addr, buf, walk->ai_addrlen); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) walk->ai_addr)->sin6_addr, buf, walk->ai_addrlen); + break; + default: + ereport(ERROR, (errmsg("invalid incoming socket family data"))); + break; + } + ereport(LOG, + (errmsg("creating watchdog heartbeat receive socket."), + errdetail("creating socket on %s:%d", buf, pool_config->wd_heartbeat_port))); - bind_is_done = 0; - for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++) - { - if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr)) < 0) + bind_is_done = 0; + for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++) { - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket. retrying..."), - errdetail("bind failed with reason: \"%m\""))); + if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0) + { + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket. retrying..."), + errdetail("bind failed with reason: \"%m\""))); - sleep(1); + sleep(1); + } + else + { + bind_is_done = 1; + } } - else + /* bind failed finally */ + if (!bind_is_done) { - bind_is_done = 1; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("bind socket failed with reason: \"%m\""))); + continue; } - } - /* bind failed finally */ - if (!bind_is_done) - { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("bind socket failed with reason: \"%m\""))); - } + if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setting close-on-exec flag failed with reason: \"%m\""))); + continue; + } - if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) - { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setting close-on-exec flag failed with reason: \"%m\""))); + socks = lappend_int(socks, sock); + n++; } - return sock; + if (target_n != n) + ereport(WARNING, + (errmsg("failed to create watchdog heartbeat receive socket as much intended"), + errdetail("only %d out of %d socket(s) had been created", n, target_n))); + + freeaddrinfo(res); + return socks; } /* send heartbeat signal */ @@ -272,29 +431,36 @@ static void wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *host, const int port) { int rtn; - struct sockaddr_in addr; - struct hostent *hp; WdHbPacket buf; + char *portstr; + struct addrinfo hints, + *res = NULL; + + portstr = psprintf("%d", port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; if (!host || !strlen(host)) ereport(ERROR, (errmsg("failed to send watchdog heartbeat. host name is empty"))); - hp = gethostbyname(host); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - ereport(ERROR, - (errmsg("failed to send watchdog heartbeat, gethostbyname() failed"), - errdetail("gethostbyname on \"%s\" failed with reason: \"%s\"", host, hstrerror(h_errno)))); - - memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length); - - addr.sin_family = AF_INET; - addr.sin_port = htons(port); + if ((rtn = getaddrinfo(host, portstr, &hints, &res)) != 0) + { + /* Perhaps wrong hostname or IP */ + ereport(WARNING, + (errmsg("failed to get address from watchdog heartbeat hostname"), + errdetail("getaddrinfo() failed: %s", gai_strerror(rtn)))); + } + pfree(portstr); hton_wd_hb_packet(&buf, pkt); if ((rtn = sendto(sock, &buf, sizeof(WdHbPacket), 0, - (struct sockaddr *) &addr, sizeof(addr))) != len) + res->ai_addr, res->ai_addrlen)) != len) { ereport(ERROR, (errmsg("failed to send watchdog heartbeat, sendto failed"), @@ -304,6 +470,7 @@ wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *host, const int port ereport(DEBUG2, (errmsg("watchdog heartbeat: send %d byte packet", rtn))); + freeaddrinfo(res); } /* @@ -315,17 +482,14 @@ static wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr) { int rtn; - struct sockaddr_in senderinfo; - socklen_t addrlen; WdHbPacket buf; + struct sockaddr_storage ss; + socklen_t addrlen = sizeof(ss); - addrlen = sizeof(senderinfo); - - rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0, - (struct sockaddr *) &senderinfo, &addrlen); + rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0, (struct sockaddr *) &ss, &addrlen); if (rtn < 0) ereport(ERROR, - (errmsg("failed to receive heartbeat packet"))); + (errmsg("failed to receive heartbeat packet with reason %m"))); else if (rtn == 0) ereport(ERROR, (errmsg("failed to receive heartbeat received zero length packet"))); @@ -333,7 +497,23 @@ wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr) ereport(DEBUG2, (errmsg("watchdog heartbeat: received %d byte packet", rtn))); - strncpy(from_addr, inet_ntoa(senderinfo.sin_addr), WD_MAX_HOST_NAMELEN - 1); + + switch (ss.ss_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, from_addr, addrlen); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, from_addr, addrlen); + break; + default: + ereport(ERROR, + (errmsg("invalid socket address family:%d", ss.ss_family))); + break; + } + + ereport(DEBUG2, + (errmsg("watchdog heartbeat: received %d byte packet", rtn))); ntoh_wd_hb_packet(pkt, &buf); } @@ -352,6 +532,7 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) char pack_str[WD_MAX_PACKET_STRING]; int pack_str_len; sigjmp_buf local_sigjmp_buf; + List *wd_hb_recv_socks = NIL; pid = fork(); if (pid != 0) @@ -392,7 +573,7 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(TopMemoryContext); - sock = wd_create_hb_recv_socket(hb_if); + wd_hb_recv_socks = wd_create_hb_recv_socket(hb_if); set_ps_display("heartbeat receiver", false); @@ -416,6 +597,14 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(ProcessLoopContext); MemoryContextResetAndDeleteChildren(ProcessLoopContext); + /* get readable socket from heartbeat socket list */ + if ((sock = select_socket_from_list(wd_hb_recv_socks, pool_config->wd_heartbeat_deadtime)) < 0) + { + ereport(ERROR, (errmsg("failed to get heartbeat from heartbeat receiver"), + errdetail("select() failed on heartbeat receiver"))); + continue; + } + /* receive heartbeat signal */ wd_hb_recv(sock, &pkt, from); /* authentication */ @@ -525,7 +714,12 @@ wd_hb_sender(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(TopMemoryContext); - sock = wd_create_hb_send_socket(hb_if); + if ((sock = wd_create_hb_send_socket(hb_if)) < 0) + { + ereport(ERROR, + (errmsg("error on creating heartbeat send socket"))); + + } set_ps_display("heartbeat sender", false); |