From 65dbbe7a0fda8d2b5cc2d41b071e2ffeab751db9 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Tue, 27 Aug 2024 10:59:37 +0900 Subject: [PATCH] Add IPv6 support for hostname and heartbeat_hostname parameter. Now these watchdog configuration parameters accept IPv6 IP address. Author: Kwangwon Seo Reviewed-by: Muhammad Usama, Tatsuo Ishii Discussion: [pgpool-hackers: 4476] Watchdog and IPv6 https://www.pgpool.net/pipermail/pgpool-hackers/2024-July/004477.html --- doc.ja/src/sgml/watchdog.sgml | 8 +- doc/src/sgml/watchdog.sgml | 4 +- src/watchdog/watchdog.c | 322 ++++++++++++++++++---------- src/watchdog/wd_heartbeat.c | 388 +++++++++++++++++++++++++--------- 4 files changed, 504 insertions(+), 218 deletions(-) diff --git a/doc.ja/src/sgml/watchdog.sgml b/doc.ja/src/sgml/watchdog.sgml index b989385c0..ceb546079 100644 --- a/doc.ja/src/sgml/watchdog.sgml +++ b/doc.ja/src/sgml/watchdog.sgml @@ -121,14 +121,14 @@ - Pgpool-IIサーバのホスト名またはIPアドレスを指定します。 + Pgpool-IIサーバのホスト名またはIPアドレス(IPv4 あるいは IPv6)を指定します。 クエリやパケットの送受信の他、watchdogの識別子としても用います。 パラメータ名の最後にある数字は「pgpool ノードID」で、0から始まります(たとえばhostname0)。 @@ -1582,13 +1582,13 @@ - ハートビート信号を送受信するためのIPアドレスまたは ホスト名を指定します。 + ハートビート信号を送受信するためのIPアドレス(IPv4 あるいは IPv6)、または ホスト名を指定します。 パラメータ名の最後にある数字は「pgpool ノードID」で、0から始まります(たとえばheartbeat_hostname0)。 複数のハートビート信号の送り先をセミコロン(;)で区切って指定することができます。 diff --git a/doc/src/sgml/watchdog.sgml b/doc/src/sgml/watchdog.sgml index 5ee8d12af..0a238f302 100644 --- a/doc/src/sgml/watchdog.sgml +++ b/doc/src/sgml/watchdog.sgml @@ -96,7 +96,7 @@ - Specifies the hostname or IP address of + Specifies the hostname or IP (IPv4 or IPv6) address of Pgpool-II server. This is used for sending/receiving queries and packets, and also as an identifier of the watchdog node. @@ -1117,7 +1117,7 @@ pgpool_port2 = 9999 - Specifies the IP address or hostname + Specifies the IP (IPv4 or IPv6) address or hostname for sending and receiving the heartbeat signals. The number at the end of the parameter name is referred as "pgpool node id", and it starts from 0 (e.g. heartbeat_hostname0). diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index 41b53d26d..f88b9795c 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -585,7 +585,7 @@ static bool get_authhash_for_node(WatchdogNode * wdNode, char *authhash); static bool verify_authhash_for_node(WatchdogNode * wdNode, char *authhash); static void print_watchdog_node_info(WatchdogNode * wdNode); -static int wd_create_recv_socket(int port); +static List *wd_create_recv_socket(int port); static void wd_check_config(void); static pid_t watchdog_main(void); static pid_t fork_watchdog_child(void); @@ -614,6 +614,7 @@ static void update_failover_timeout(WatchdogNode * wdNode, POOL_CONFIG *pool_con wd_cluster g_cluster; struct timeval g_tm_set_time; int g_timeout_sec = 0; +List *g_wd_recv_socks = NIL; static unsigned int get_next_commandID(void) @@ -823,7 +824,8 @@ wd_cluster_initialize(void) g_cluster.ipc_auth_needed = strlen(pool_config->wd_authkey) ? true : false; g_cluster.localNode->escalated = get_watchdog_node_escalation_state(); - + g_cluster.localNode->server_socket.sock = 0; + g_cluster.localNode->server_socket.sock_state = WD_SOCK_CLOSED; wd_initialize_monitoring_interfaces(); if (g_cluster.ipc_auth_needed) { @@ -855,82 +857,132 @@ clear_command_node_result(WDCommandNodeResult * nodeResult) nodeResult->cmdState = COMMAND_STATE_INIT; } -static int +static List * wd_create_recv_socket(int port) { - size_t len = 0; - struct sockaddr_in addr; int one = 1; int sock = -1; int saved_errno; + int gai_ret, + n = 0, + target_n = n; + char *portstr = NULL; + struct addrinfo hints, + *walk, + *res = NULL; + List *socks = NIL; + + portstr = psprintf("%d", port); - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV | AI_PASSIVE; + + if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0) { - /* socket create failed */ ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("create socket failed with reason: \"%m\""))); + (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret)))); + pfree(portstr); + return NIL; } + pfree(portstr); - socket_set_nonblock(sock); + for (walk = res; walk != NULL; walk = walk->ai_next) + n++; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + if (n == 0) { - /* setsockopt(SO_REUSEADDR) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno)))); + ereport(ERROR, (errmsg("failed to create watchdog receive socket"), + errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", port))); + return NIL; } - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1) + else { - /* setsockopt(TCP_NODELAY) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno)))); + target_n = n; + n = 0; } - if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1) + + for (walk = res; walk != NULL; walk = walk->ai_next) { - /* setsockopt(SO_KEEPALIVE) failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno)))); - } + if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0) + { + /* socket create failed */ + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("create socket failed with reason: \"%m\""))); + } - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(port); - len = sizeof(struct sockaddr_in); + socket_set_nonblock(sock); - if (bind(sock, (struct sockaddr *) &addr, len) < 0) - { - /* bind failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno)))); - } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(SO_REUSEADDR) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(TCP_NODELAY) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1) + { + /* setsockopt(SO_KEEPALIVE) failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno)))); + } + if (walk->ai_family == AF_INET6) + { + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1) + { + ereport(ERROR, + (errmsg("failed to set IPPROTO_IPV6 option to watchdog receive socket"), + errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\""))); + } + } + if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0) + { + /* bind failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno)))); + } - if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0) - { - /* listen failed */ - saved_errno = errno; - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog receive socket"), - errdetail("listen failed with reason: \"%s\"", strerror(saved_errno)))); - } + if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0) + { + /* listen failed */ + saved_errno = errno; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog receive socket"), + errdetail("listen failed with reason: \"%s\"", strerror(saved_errno)))); + } - return sock; -} + socks = lappend_int(socks, sock); + n++; + } + if (target_n != n) + ereport(WARNING, + (errmsg("failed to create watchdog receive socket as much intended"), + errdetail("only %d out of %d socket(s) had been created", n, target_n))); + return socks; +} /* * creates a socket in non blocking mode and connects it to the hostname and port @@ -939,15 +991,33 @@ wd_create_recv_socket(int port) static int wd_create_client_socket(char *hostname, int port, bool *connected) { - int sock; + int sock, + gai_ret = -1; int one = 1; - size_t len = 0; - struct sockaddr_in addr; - struct hostent *hp; + char *portstr = NULL; + struct addrinfo hints, + *res = NULL; + portstr = psprintf("%d", port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((gai_ret = getaddrinfo(hostname, portstr, &hints, &res)) != 0) + { + ereport(ERROR, + (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret)))); + + pfree(portstr); + return -1; + } + pfree(portstr); *connected = false; /* create socket */ - if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) { /* socket create failed */ ereport(LOG, @@ -972,31 +1042,13 @@ wd_create_client_socket(char *hostname, int port, bool *connected) close(sock); return -1; } - /* set sockaddr_in */ - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - hp = gethostbyname(hostname); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - { - hp = gethostbyaddr(hostname, strlen(hostname), AF_INET); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - { - ereport(LOG, - (errmsg("failed to get host address for \"%s\"", hostname), - errdetail("gethostbyaddr failed with error: \"%s\"", hstrerror(h_errno)))); - close(sock); - return -1; - } - } - memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length); - addr.sin_port = htons(port); - len = sizeof(struct sockaddr_in); /* set socket to non blocking */ socket_set_nonblock(sock); - if (connect(sock, (struct sockaddr *) &addr, len) < 0) + if (connect(sock, res->ai_addr, res->ai_addrlen) < 0) { + freeaddrinfo(res); if (errno == EINPROGRESS) { return sock; @@ -1013,6 +1065,7 @@ wd_create_client_socket(char *hostname, int port, bool *connected) close(sock); return -1; } + freeaddrinfo(res); /* set socket to blocking again */ socket_unset_nonblock(sock); *connected = true; @@ -1191,8 +1244,8 @@ watchdog_main(void) /* initialize all the local structures for watchdog */ wd_cluster_initialize(); /* create a server socket for incoming watchdog connections */ - g_cluster.localNode->server_socket.sock = wd_create_recv_socket(g_cluster.localNode->wd_port); - g_cluster.localNode->server_socket.sock_state = WD_SOCK_CONNECTED; + g_wd_recv_socks = wd_create_recv_socket(g_cluster.localNode->wd_port); + /* open the command server */ g_cluster.command_server_sock = wd_create_command_server_socket(); @@ -1406,9 +1459,16 @@ prepare_fds(fd_set *rmask, fd_set *wmask, fd_set *emask) FD_ZERO(wmask); FD_ZERO(emask); - /* local node server socket will set the read and exception fds */ - FD_SET(g_cluster.localNode->server_socket.sock, rmask); - FD_SET(g_cluster.localNode->server_socket.sock, emask); + foreach(lc, g_wd_recv_socks) + { + i = lfirst_int(lc); + if (fd_max < i) + fd_max = i; + + /* local node server socket will set the read and exception fds */ + FD_SET(i, rmask); + FD_SET(i, emask); + } /* command server socket will set the read and exception fds */ FD_SET(g_cluster.command_server_sock, rmask); @@ -3299,6 +3359,7 @@ static void wd_system_will_go_down(int code, Datum arg) { int i; + ListCell *lc; ereport(LOG, (errmsg("Watchdog is shutting down"))); @@ -3307,8 +3368,16 @@ wd_system_will_go_down(int code, Datum arg) if (get_local_node_state() == WD_COORDINATOR) resign_from_escalated_node(); - /* close server socket */ - close_socket_connection(&g_cluster.localNode->server_socket); + + /* close watchdog receive sockets */ + foreach(lc, g_wd_recv_socks) + { + i = lfirst_int(lc); + close(i); + } + list_free(g_wd_recv_socks); + g_wd_recv_socks = NIL; + /* close all node sockets */ for (i = 0; i < g_cluster.remoteNodeCount; i++) { @@ -3382,39 +3451,62 @@ accept_incoming_connections(fd_set *rmask, int pending_fds_count) { int processed_fds = 0; int fd; + ListCell *lc; + int sock; - if (FD_ISSET(g_cluster.localNode->server_socket.sock, rmask)) + foreach(lc, g_wd_recv_socks) { - struct sockaddr_in addr; - socklen_t addrlen = sizeof(struct sockaddr_in); - - processed_fds++; - fd = accept(g_cluster.localNode->server_socket.sock, (struct sockaddr *) &addr, &addrlen); - if (fd < 0) + sock = lfirst_int(lc); + if (FD_ISSET(sock, rmask)) { - if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK) + struct sockaddr_storage ss; + socklen_t addrlen = sizeof(ss); + int port; + + processed_fds++; + fd = accept(sock, (struct sockaddr *) &ss, &addrlen); + if (fd < 0) { - /* nothing to accept now */ - ereport(DEBUG2, - (errmsg("Failed to accept incoming watchdog connection, Nothing to accept"))); + if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK) + { + /* nothing to accept now */ + ereport(DEBUG2, + (errmsg("Failed to accept incoming watchdog connection, Nothing to accept"))); + } + /* accept failed */ + ereport(DEBUG1, + (errmsg("Failed to accept incoming watchdog connection"))); } - /* accept failed */ - ereport(DEBUG1, - (errmsg("Failed to accept incoming watchdog connection"))); - } - else - { - MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext); - SocketConnection *conn = palloc(sizeof(SocketConnection)); + else + { + MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext); + SocketConnection *conn = palloc(sizeof(SocketConnection)); - conn->sock = fd; - conn->sock_state = WD_SOCK_CONNECTED; - gettimeofday(&conn->tv, NULL); - strncpy(conn->addr, inet_ntoa(addr.sin_addr), sizeof(conn->addr) - 1); - ereport(LOG, - (errmsg("new watchdog node connection is received from \"%s:%d\"", inet_ntoa(addr.sin_addr), addr.sin_port))); - g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn); - MemoryContextSwitchTo(oldCxt); + conn->sock = fd; + conn->sock_state = WD_SOCK_CONNECTED; + gettimeofday(&conn->tv, NULL); + + switch (ss.ss_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, conn->addr, addrlen); + port = ntohs(((struct sockaddr_in *) (&ss))->sin_port); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, conn->addr, addrlen); + port = ntohs(((struct sockaddr_in6 *) (&ss))->sin6_port); + break; + default: + ereport(ERROR, (errmsg("invalid incoming socket family data"))); + break; + } + + ereport(LOG, + (errmsg("new watchdog node connection is received from \"%s:%d\"", conn->addr, port))); + + g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn); + MemoryContextSwitchTo(oldCxt); + } } } diff --git a/src/watchdog/wd_heartbeat.c b/src/watchdog/wd_heartbeat.c index d9ace72ec..67dcf505d 100644 --- a/src/watchdog/wd_heartbeat.c +++ b/src/watchdog/wd_heartbeat.c @@ -75,28 +75,117 @@ static int hton_wd_hb_packet(WdHbPacket * to, WdHbPacket * from); static int ntoh_wd_hb_packet(WdHbPacket * to, WdHbPacket * from); static int packet_to_string_hb(WdHbPacket * pkt, char *str, int maxlen); static void wd_set_reuseport(int sock); +static int select_socket_from_list(List *socks, int timeout_sec); static int wd_create_hb_send_socket(WdHbIf * hb_if); -static int wd_create_hb_recv_socket(WdHbIf * hb_if); +static List *wd_create_hb_recv_socket(WdHbIf * hb_if); static void wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *destination, const int dest_port); static void wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr); + /* + * Readable socket will be returned among the listening socket list. + */ +static int +select_socket_from_list(List *socks, int timeout_sec) +{ + int select_ret; + int maxsfd = 0; + int sock; + fd_set rfds; + fd_set efds; + ListCell *lc; + struct timeval tv; + + tv.tv_sec = timeout_sec; + tv.tv_usec = 0; + + FD_ZERO(&rfds); + FD_ZERO(&efds); + + foreach(lc, socks) + { + sock = lfirst_int(lc); + FD_SET(sock, &rfds); + FD_SET(sock, &efds); + if (maxsfd < sock) + maxsfd = sock; + } + + select_ret = select(maxsfd + 1, &rfds, NULL, &efds, &tv); + + if (select_ret > 0) + { + foreach(lc, socks) + { + sock = lfirst_int(lc); + if (FD_ISSET(sock, &rfds)) + { + ereport(DEBUG2, + (errmsg("wd_hb_recv_socket fd:%d is readable", sock))); + return sock; + } + if (FD_ISSET(sock, &efds)) + { + ereport(WARNING, + (errmsg("exception detected on heartbeat receive socket fd:%d", sock))); + break; + } + } + } + else if (select_ret == -1) + { + ereport(ERROR, + (errmsg("failed to get socket data from heartbeat receive socket list"), + errdetail("select() failed with reason %m"))); + } + else + { + ereport(ERROR, + (errmsg("failed to get socket data from heartbeat receive socket list"), + errdetail("select() got timeout, exceed %d sec(s)", timeout_sec))); + } + + return -1; +} + /* create socket for sending heartbeat */ static int wd_create_hb_send_socket(WdHbIf * hb_if) { - int sock; + int sock = -1; int tos; + int ret; + char *portstr = NULL; + struct addrinfo hints, + *res = NULL; + + portstr = psprintf("%d", hb_if->dest_port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((ret = getaddrinfo(hb_if->addr, portstr, &hints, &res)) != 0) + { + ereport(ERROR, + (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(ret)))); + pfree(portstr); + return -1; + } + pfree(portstr); /* create socket */ - if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0) { /* socket create failed */ ereport(ERROR, (errmsg("failed to create watchdog heartbeat sender socket"), errdetail("create socket failed with reason: \"%m\""))); } + freeaddrinfo(res); /* set socket option */ tos = IPTOS_LOWDELAY; @@ -159,112 +248,182 @@ wd_create_hb_send_socket(WdHbIf * hb_if) } /* create socket for receiving heartbeat */ -static int +static List * wd_create_hb_recv_socket(WdHbIf * hb_if) { - struct sockaddr_in addr; - int sock; + int sock = -1, + gai_ret, + n = 0, + target_n = n; + List *socks = NIL; const int one = 1; int bind_tries; int bind_is_done; + char buf[INET6_ADDRSTRLEN] = {'\0'}; + char *portstr = NULL; + struct addrinfo hints, + *walk, + *res = NULL; - memset(&(addr), 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(pool_config->wd_heartbeat_port); - addr.sin_addr.s_addr = INADDR_ANY; + portstr = psprintf("%d", pool_config->wd_heartbeat_port); - /* create socket */ - if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; + + if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0) { - /* socket create failed */ ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("create socket failed with reason: \"%m\""))); + (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(gai_ret)))); + pfree(portstr); + return NIL; } + pfree(portstr); + + for (walk = res; walk != NULL; walk = walk->ai_next) + n++; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + if (n == 0) { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\""))); + ereport(ERROR, (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", pool_config->wd_heartbeat_port))); + return NULL; + } + else + { + target_n = n; + n = 0; } - if (hb_if->if_name[0] != '\0') + + for (walk = res; walk != NULL; walk = walk->ai_next) { -#if defined(SO_BINDTODEVICE) + /* create socket */ + if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0) { - if (geteuid() == 0) /* check root privileges */ - { - struct ifreq i; + /* socket create failed */ + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("create socket failed with reason: \"%m\""))); + } - strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name)); + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\""))); + } + if (walk->ai_family == AF_INET6) + { + if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1) + { + ereport(ERROR, + (errmsg("failed to set IPPROTO_IPV6 option to watchdog heartbeat recv socket"), + errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\""))); + } + } - if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1) + if (hb_if->if_name[0] != '\0') + { +#if defined(SO_BINDTODEVICE) + { + if (geteuid() == 0) /* check root privileges */ { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\""))); - } - ereport(LOG, - (errmsg("creating watchdog heartbeat receive socket."), - errdetail("bind receive socket to device: \"%s\"", i.ifr_name))); + struct ifreq i; + + strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name)); + + if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\""))); + } + ereport(LOG, + (errmsg("creating watchdog heartbeat receive socket."), + errdetail("bind receive socket to device: \"%s\"", i.ifr_name))); + } + else + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket."), + errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege"))); } - else - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket."), - errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege"))); - } #else - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform"))); + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform"))); #endif - } + } - wd_set_reuseport(sock); + wd_set_reuseport(sock); - ereport(LOG, - (errmsg("creating watchdog heartbeat receive socket."), - errdetail("set SO_REUSEPORT"))); + switch (walk->ai_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) walk->ai_addr)->sin_addr, buf, walk->ai_addrlen); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) walk->ai_addr)->sin6_addr, buf, walk->ai_addrlen); + break; + default: + ereport(ERROR, (errmsg("invalid incoming socket family data"))); + break; + } + ereport(LOG, + (errmsg("creating watchdog heartbeat receive socket."), + errdetail("creating socket on %s:%d", buf, pool_config->wd_heartbeat_port))); - bind_is_done = 0; - for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++) - { - if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr)) < 0) + bind_is_done = 0; + for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++) { - ereport(LOG, - (errmsg("failed to create watchdog heartbeat receive socket. retrying..."), - errdetail("bind failed with reason: \"%m\""))); + if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0) + { + ereport(LOG, + (errmsg("failed to create watchdog heartbeat receive socket. retrying..."), + errdetail("bind failed with reason: \"%m\""))); - sleep(1); + sleep(1); + } + else + { + bind_is_done = 1; + } } - else + /* bind failed finally */ + if (!bind_is_done) { - bind_is_done = 1; + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("bind socket failed with reason: \"%m\""))); + continue; } - } - /* bind failed finally */ - if (!bind_is_done) - { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("bind socket failed with reason: \"%m\""))); - } + if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) + { + close(sock); + ereport(ERROR, + (errmsg("failed to create watchdog heartbeat receive socket"), + errdetail("setting close-on-exec flag failed with reason: \"%m\""))); + continue; + } - if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0) - { - close(sock); - ereport(ERROR, - (errmsg("failed to create watchdog heartbeat receive socket"), - errdetail("setting close-on-exec flag failed with reason: \"%m\""))); + socks = lappend_int(socks, sock); + n++; } - return sock; + if (target_n != n) + ereport(WARNING, + (errmsg("failed to create watchdog heartbeat receive socket as much intended"), + errdetail("only %d out of %d socket(s) had been created", n, target_n))); + + freeaddrinfo(res); + return socks; } /* send heartbeat signal */ @@ -272,29 +431,36 @@ static void wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *host, const int port) { int rtn; - struct sockaddr_in addr; - struct hostent *hp; WdHbPacket buf; + char *portstr; + struct addrinfo hints, + *res = NULL; + + portstr = psprintf("%d", port); + + memset(&hints, 0x00, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_protocol = 0; + hints.ai_flags = AI_NUMERICSERV; if (!host || !strlen(host)) ereport(ERROR, (errmsg("failed to send watchdog heartbeat. host name is empty"))); - hp = gethostbyname(host); - if ((hp == NULL) || (hp->h_addrtype != AF_INET)) - ereport(ERROR, - (errmsg("failed to send watchdog heartbeat, gethostbyname() failed"), - errdetail("gethostbyname on \"%s\" failed with reason: \"%s\"", host, hstrerror(h_errno)))); - - memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length); - - addr.sin_family = AF_INET; - addr.sin_port = htons(port); + if ((rtn = getaddrinfo(host, portstr, &hints, &res)) != 0) + { + /* Perhaps wrong hostname or IP */ + ereport(WARNING, + (errmsg("failed to get address from watchdog heartbeat hostname"), + errdetail("getaddrinfo() failed: %s", gai_strerror(rtn)))); + } + pfree(portstr); hton_wd_hb_packet(&buf, pkt); if ((rtn = sendto(sock, &buf, sizeof(WdHbPacket), 0, - (struct sockaddr *) &addr, sizeof(addr))) != len) + res->ai_addr, res->ai_addrlen)) != len) { ereport(ERROR, (errmsg("failed to send watchdog heartbeat, sendto failed"), @@ -304,6 +470,7 @@ wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *host, const int port ereport(DEBUG2, (errmsg("watchdog heartbeat: send %d byte packet", rtn))); + freeaddrinfo(res); } /* @@ -315,17 +482,14 @@ static wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr) { int rtn; - struct sockaddr_in senderinfo; - socklen_t addrlen; WdHbPacket buf; + struct sockaddr_storage ss; + socklen_t addrlen = sizeof(ss); - addrlen = sizeof(senderinfo); - - rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0, - (struct sockaddr *) &senderinfo, &addrlen); + rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0, (struct sockaddr *) &ss, &addrlen); if (rtn < 0) ereport(ERROR, - (errmsg("failed to receive heartbeat packet"))); + (errmsg("failed to receive heartbeat packet with reason %m"))); else if (rtn == 0) ereport(ERROR, (errmsg("failed to receive heartbeat received zero length packet"))); @@ -333,7 +497,23 @@ wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr) ereport(DEBUG2, (errmsg("watchdog heartbeat: received %d byte packet", rtn))); - strncpy(from_addr, inet_ntoa(senderinfo.sin_addr), WD_MAX_HOST_NAMELEN - 1); + + switch (ss.ss_family) + { + case AF_INET: + inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, from_addr, addrlen); + break; + case AF_INET6: + inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, from_addr, addrlen); + break; + default: + ereport(ERROR, + (errmsg("invalid socket address family:%d", ss.ss_family))); + break; + } + + ereport(DEBUG2, + (errmsg("watchdog heartbeat: received %d byte packet", rtn))); ntoh_wd_hb_packet(pkt, &buf); } @@ -352,6 +532,7 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) char pack_str[WD_MAX_PACKET_STRING]; int pack_str_len; sigjmp_buf local_sigjmp_buf; + List *wd_hb_recv_socks = NIL; pid = fork(); if (pid != 0) @@ -392,7 +573,7 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(TopMemoryContext); - sock = wd_create_hb_recv_socket(hb_if); + wd_hb_recv_socks = wd_create_hb_recv_socket(hb_if); set_ps_display("heartbeat receiver", false); @@ -416,6 +597,14 @@ wd_hb_receiver(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(ProcessLoopContext); MemoryContextResetAndDeleteChildren(ProcessLoopContext); + /* get readable socket from heartbeat socket list */ + if ((sock = select_socket_from_list(wd_hb_recv_socks, pool_config->wd_heartbeat_deadtime)) < 0) + { + ereport(ERROR, (errmsg("failed to get heartbeat from heartbeat receiver"), + errdetail("select() failed on heartbeat receiver"))); + continue; + } + /* receive heartbeat signal */ wd_hb_recv(sock, &pkt, from); /* authentication */ @@ -525,7 +714,12 @@ wd_hb_sender(int fork_wait_time, WdHbIf * hb_if) MemoryContextSwitchTo(TopMemoryContext); - sock = wd_create_hb_send_socket(hb_if); + if ((sock = wd_create_hb_send_socket(hb_if)) < 0) + { + ereport(ERROR, + (errmsg("error on creating heartbeat send socket"))); + + } set_ps_display("heartbeat sender", false); -- 2.39.5