static bool verify_authhash_for_node(WatchdogNode * wdNode, char *authhash);
static void print_watchdog_node_info(WatchdogNode * wdNode);
-static int wd_create_recv_socket(int port);
+static List *wd_create_recv_socket(int port);
static void wd_check_config(void);
static pid_t watchdog_main(void);
static pid_t fork_watchdog_child(void);
wd_cluster g_cluster;
struct timeval g_tm_set_time;
int g_timeout_sec = 0;
+List *g_wd_recv_socks = NIL;
static unsigned int
get_next_commandID(void)
g_cluster.ipc_auth_needed = strlen(pool_config->wd_authkey) ? true : false;
g_cluster.localNode->escalated = get_watchdog_node_escalation_state();
-
+ g_cluster.localNode->server_socket.sock = 0;
+ g_cluster.localNode->server_socket.sock_state = WD_SOCK_CLOSED;
wd_initialize_monitoring_interfaces();
if (g_cluster.ipc_auth_needed)
{
nodeResult->cmdState = COMMAND_STATE_INIT;
}
-static int
+static List *
wd_create_recv_socket(int port)
{
- size_t len = 0;
- struct sockaddr_in addr;
int one = 1;
int sock = -1;
int saved_errno;
+ int gai_ret,
+ n = 0,
+ target_n = n;
+ char *portstr = NULL;
+ struct addrinfo hints,
+ *walk,
+ *res = NULL;
+ List *socks = NIL;
+
+ portstr = psprintf("%d", port);
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ memset(&hints, 0x00, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 0;
+ hints.ai_flags = AI_NUMERICSERV | AI_PASSIVE;
+
+ if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0)
{
- /* socket create failed */
ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("create socket failed with reason: \"%m\"")));
+ (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret))));
+ pfree(portstr);
+ return NIL;
}
+ pfree(portstr);
- socket_set_nonblock(sock);
+ for (walk = res; walk != NULL; walk = walk->ai_next)
+ n++;
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1)
+ if (n == 0)
{
- /* setsockopt(SO_REUSEADDR) failed */
- saved_errno = errno;
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno))));
+ ereport(ERROR, (errmsg("failed to create watchdog receive socket"),
+ errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", port)));
+ return NIL;
}
- if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)
+ else
{
- /* setsockopt(TCP_NODELAY) failed */
- saved_errno = errno;
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno))));
+ target_n = n;
+ n = 0;
}
- if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1)
+
+ for (walk = res; walk != NULL; walk = walk->ai_next)
{
- /* setsockopt(SO_KEEPALIVE) failed */
- saved_errno = errno;
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno))));
- }
+ if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0)
+ {
+ /* socket create failed */
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("create socket failed with reason: \"%m\"")));
+ }
- addr.sin_family = AF_INET;
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
- addr.sin_port = htons(port);
- len = sizeof(struct sockaddr_in);
+ socket_set_nonblock(sock);
- if (bind(sock, (struct sockaddr *) &addr, len) < 0)
- {
- /* bind failed */
- saved_errno = errno;
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno))));
- }
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1)
+ {
+ /* setsockopt(SO_REUSEADDR) failed */
+ saved_errno = errno;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%s\"", strerror(saved_errno))));
+ }
+ if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof(one)) == -1)
+ {
+ /* setsockopt(TCP_NODELAY) failed */
+ saved_errno = errno;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("setsockopt(TCP_NODELAY) failed with reason: \"%s\"", strerror(saved_errno))));
+ }
+ if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *) &one, sizeof(one)) == -1)
+ {
+ /* setsockopt(SO_KEEPALIVE) failed */
+ saved_errno = errno;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("setsockopt(SO_KEEPALIVE) failed with reason: \"%s\"", strerror(saved_errno))));
+ }
+ if (walk->ai_family == AF_INET6)
+ {
+ if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1)
+ {
+ ereport(ERROR,
+ (errmsg("failed to set IPPROTO_IPV6 option to watchdog receive socket"),
+ errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\"")));
+ }
+ }
+ if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0)
+ {
+ /* bind failed */
+ saved_errno = errno;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("bind on \"TCP:%d\" failed with reason: \"%s\"", port, strerror(saved_errno))));
+ }
- if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0)
- {
- /* listen failed */
- saved_errno = errno;
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog receive socket"),
- errdetail("listen failed with reason: \"%s\"", strerror(saved_errno))));
- }
+ if (listen(sock, MAX_WATCHDOG_NUM * 2) < 0)
+ {
+ /* listen failed */
+ saved_errno = errno;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog receive socket"),
+ errdetail("listen failed with reason: \"%s\"", strerror(saved_errno))));
+ }
- return sock;
-}
+ socks = lappend_int(socks, sock);
+ n++;
+ }
+ if (target_n != n)
+ ereport(WARNING,
+ (errmsg("failed to create watchdog receive socket as much intended"),
+ errdetail("only %d out of %d socket(s) had been created", n, target_n)));
+ return socks;
+}
/*
* creates a socket in non blocking mode and connects it to the hostname and port
static int
wd_create_client_socket(char *hostname, int port, bool *connected)
{
- int sock;
+ int sock,
+ gai_ret = -1;
int one = 1;
- size_t len = 0;
- struct sockaddr_in addr;
- struct hostent *hp;
+ char *portstr = NULL;
+ struct addrinfo hints,
+ *res = NULL;
+ portstr = psprintf("%d", port);
+
+ memset(&hints, 0x00, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 0;
+ hints.ai_flags = AI_NUMERICSERV;
+
+ if ((gai_ret = getaddrinfo(hostname, portstr, &hints, &res)) != 0)
+ {
+ ereport(ERROR,
+ (errmsg("getaddrinfo failed with error \"%s\"", gai_strerror(gai_ret))));
+
+ pfree(portstr);
+ return -1;
+ }
+ pfree(portstr);
*connected = false;
/* create socket */
- if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0)
{
/* socket create failed */
ereport(LOG,
close(sock);
return -1;
}
- /* set sockaddr_in */
- memset(&addr, 0, sizeof(addr));
- addr.sin_family = AF_INET;
- hp = gethostbyname(hostname);
- if ((hp == NULL) || (hp->h_addrtype != AF_INET))
- {
- hp = gethostbyaddr(hostname, strlen(hostname), AF_INET);
- if ((hp == NULL) || (hp->h_addrtype != AF_INET))
- {
- ereport(LOG,
- (errmsg("failed to get host address for \"%s\"", hostname),
- errdetail("gethostbyaddr failed with error: \"%s\"", hstrerror(h_errno))));
- close(sock);
- return -1;
- }
- }
- memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length);
- addr.sin_port = htons(port);
- len = sizeof(struct sockaddr_in);
/* set socket to non blocking */
socket_set_nonblock(sock);
- if (connect(sock, (struct sockaddr *) &addr, len) < 0)
+ if (connect(sock, res->ai_addr, res->ai_addrlen) < 0)
{
+ freeaddrinfo(res);
if (errno == EINPROGRESS)
{
return sock;
close(sock);
return -1;
}
+ freeaddrinfo(res);
/* set socket to blocking again */
socket_unset_nonblock(sock);
*connected = true;
/* initialize all the local structures for watchdog */
wd_cluster_initialize();
/* create a server socket for incoming watchdog connections */
- g_cluster.localNode->server_socket.sock = wd_create_recv_socket(g_cluster.localNode->wd_port);
- g_cluster.localNode->server_socket.sock_state = WD_SOCK_CONNECTED;
+ g_wd_recv_socks = wd_create_recv_socket(g_cluster.localNode->wd_port);
+
/* open the command server */
g_cluster.command_server_sock = wd_create_command_server_socket();
FD_ZERO(wmask);
FD_ZERO(emask);
- /* local node server socket will set the read and exception fds */
- FD_SET(g_cluster.localNode->server_socket.sock, rmask);
- FD_SET(g_cluster.localNode->server_socket.sock, emask);
+ foreach(lc, g_wd_recv_socks)
+ {
+ i = lfirst_int(lc);
+ if (fd_max < i)
+ fd_max = i;
+
+ /* local node server socket will set the read and exception fds */
+ FD_SET(i, rmask);
+ FD_SET(i, emask);
+ }
/* command server socket will set the read and exception fds */
FD_SET(g_cluster.command_server_sock, rmask);
wd_system_will_go_down(int code, Datum arg)
{
int i;
+ ListCell *lc;
ereport(LOG,
(errmsg("Watchdog is shutting down")));
if (get_local_node_state() == WD_COORDINATOR)
resign_from_escalated_node();
- /* close server socket */
- close_socket_connection(&g_cluster.localNode->server_socket);
+
+ /* close watchdog receive sockets */
+ foreach(lc, g_wd_recv_socks)
+ {
+ i = lfirst_int(lc);
+ close(i);
+ }
+ list_free(g_wd_recv_socks);
+ g_wd_recv_socks = NIL;
+
/* close all node sockets */
for (i = 0; i < g_cluster.remoteNodeCount; i++)
{
{
int processed_fds = 0;
int fd;
+ ListCell *lc;
+ int sock;
- if (FD_ISSET(g_cluster.localNode->server_socket.sock, rmask))
+ foreach(lc, g_wd_recv_socks)
{
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof(struct sockaddr_in);
-
- processed_fds++;
- fd = accept(g_cluster.localNode->server_socket.sock, (struct sockaddr *) &addr, &addrlen);
- if (fd < 0)
+ sock = lfirst_int(lc);
+ if (FD_ISSET(sock, rmask))
{
- if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
+ struct sockaddr_storage ss;
+ socklen_t addrlen = sizeof(ss);
+ int port;
+
+ processed_fds++;
+ fd = accept(sock, (struct sockaddr *) &ss, &addrlen);
+ if (fd < 0)
{
- /* nothing to accept now */
- ereport(DEBUG2,
- (errmsg("Failed to accept incoming watchdog connection, Nothing to accept")));
+ if (errno == EINTR || errno == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
+ {
+ /* nothing to accept now */
+ ereport(DEBUG2,
+ (errmsg("Failed to accept incoming watchdog connection, Nothing to accept")));
+ }
+ /* accept failed */
+ ereport(DEBUG1,
+ (errmsg("Failed to accept incoming watchdog connection")));
}
- /* accept failed */
- ereport(DEBUG1,
- (errmsg("Failed to accept incoming watchdog connection")));
- }
- else
- {
- MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
- SocketConnection *conn = palloc(sizeof(SocketConnection));
+ else
+ {
+ MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
+ SocketConnection *conn = palloc(sizeof(SocketConnection));
- conn->sock = fd;
- conn->sock_state = WD_SOCK_CONNECTED;
- gettimeofday(&conn->tv, NULL);
- strncpy(conn->addr, inet_ntoa(addr.sin_addr), sizeof(conn->addr) - 1);
- ereport(LOG,
- (errmsg("new watchdog node connection is received from \"%s:%d\"", inet_ntoa(addr.sin_addr), addr.sin_port)));
- g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn);
- MemoryContextSwitchTo(oldCxt);
+ conn->sock = fd;
+ conn->sock_state = WD_SOCK_CONNECTED;
+ gettimeofday(&conn->tv, NULL);
+
+ switch (ss.ss_family)
+ {
+ case AF_INET:
+ inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, conn->addr, addrlen);
+ port = ntohs(((struct sockaddr_in *) (&ss))->sin_port);
+ break;
+ case AF_INET6:
+ inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, conn->addr, addrlen);
+ port = ntohs(((struct sockaddr_in6 *) (&ss))->sin6_port);
+ break;
+ default:
+ ereport(ERROR, (errmsg("invalid incoming socket family data")));
+ break;
+ }
+
+ ereport(LOG,
+ (errmsg("new watchdog node connection is received from \"%s:%d\"", conn->addr, port)));
+
+ g_cluster.unidentified_socks = lappend(g_cluster.unidentified_socks, conn);
+ MemoryContextSwitchTo(oldCxt);
+ }
}
}
static int ntoh_wd_hb_packet(WdHbPacket * to, WdHbPacket * from);
static int packet_to_string_hb(WdHbPacket * pkt, char *str, int maxlen);
static void wd_set_reuseport(int sock);
+static int select_socket_from_list(List *socks, int timeout_sec);
static int wd_create_hb_send_socket(WdHbIf * hb_if);
-static int wd_create_hb_recv_socket(WdHbIf * hb_if);
+static List *wd_create_hb_recv_socket(WdHbIf * hb_if);
static void wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *destination, const int dest_port);
static void wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr);
+ /*
+ * Readable socket will be returned among the listening socket list.
+ */
+static int
+select_socket_from_list(List *socks, int timeout_sec)
+{
+ int select_ret;
+ int maxsfd = 0;
+ int sock;
+ fd_set rfds;
+ fd_set efds;
+ ListCell *lc;
+ struct timeval tv;
+
+ tv.tv_sec = timeout_sec;
+ tv.tv_usec = 0;
+
+ FD_ZERO(&rfds);
+ FD_ZERO(&efds);
+
+ foreach(lc, socks)
+ {
+ sock = lfirst_int(lc);
+ FD_SET(sock, &rfds);
+ FD_SET(sock, &efds);
+ if (maxsfd < sock)
+ maxsfd = sock;
+ }
+
+ select_ret = select(maxsfd + 1, &rfds, NULL, &efds, &tv);
+
+ if (select_ret > 0)
+ {
+ foreach(lc, socks)
+ {
+ sock = lfirst_int(lc);
+ if (FD_ISSET(sock, &rfds))
+ {
+ ereport(DEBUG2,
+ (errmsg("wd_hb_recv_socket fd:%d is readable", sock)));
+ return sock;
+ }
+ if (FD_ISSET(sock, &efds))
+ {
+ ereport(WARNING,
+ (errmsg("exception detected on heartbeat receive socket fd:%d", sock)));
+ break;
+ }
+ }
+ }
+ else if (select_ret == -1)
+ {
+ ereport(ERROR,
+ (errmsg("failed to get socket data from heartbeat receive socket list"),
+ errdetail("select() failed with reason %m")));
+ }
+ else
+ {
+ ereport(ERROR,
+ (errmsg("failed to get socket data from heartbeat receive socket list"),
+ errdetail("select() got timeout, exceed %d sec(s)", timeout_sec)));
+ }
+
+ return -1;
+}
+
/* create socket for sending heartbeat */
static int
wd_create_hb_send_socket(WdHbIf * hb_if)
{
- int sock;
+ int sock = -1;
int tos;
+ int ret;
+ char *portstr = NULL;
+ struct addrinfo hints,
+ *res = NULL;
+
+ portstr = psprintf("%d", hb_if->dest_port);
+
+ memset(&hints, 0x00, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_protocol = 0;
+ hints.ai_flags = AI_NUMERICSERV;
+
+ if ((ret = getaddrinfo(hb_if->addr, portstr, &hints, &res)) != 0)
+ {
+ ereport(ERROR,
+ (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(ret))));
+ pfree(portstr);
+ return -1;
+ }
+ pfree(portstr);
/* create socket */
- if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+ if ((sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol)) < 0)
{
/* socket create failed */
ereport(ERROR,
(errmsg("failed to create watchdog heartbeat sender socket"),
errdetail("create socket failed with reason: \"%m\"")));
}
+ freeaddrinfo(res);
/* set socket option */
tos = IPTOS_LOWDELAY;
}
/* create socket for receiving heartbeat */
-static int
+static List *
wd_create_hb_recv_socket(WdHbIf * hb_if)
{
- struct sockaddr_in addr;
- int sock;
+ int sock = -1,
+ gai_ret,
+ n = 0,
+ target_n = n;
+ List *socks = NIL;
const int one = 1;
int bind_tries;
int bind_is_done;
+ char buf[INET6_ADDRSTRLEN] = {'\0'};
+ char *portstr = NULL;
+ struct addrinfo hints,
+ *walk,
+ *res = NULL;
- memset(&(addr), 0, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(pool_config->wd_heartbeat_port);
- addr.sin_addr.s_addr = INADDR_ANY;
+ portstr = psprintf("%d", pool_config->wd_heartbeat_port);
- /* create socket */
- if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+ memset(&hints, 0x00, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_protocol = 0;
+ hints.ai_flags = AI_NUMERICSERV;
+
+ if ((gai_ret = getaddrinfo(NULL, portstr, &hints, &res)) != 0)
{
- /* socket create failed */
ereport(ERROR,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("create socket failed with reason: \"%m\"")));
+ (errmsg("getaddrinfo() failed with error \"%s\"", gai_strerror(gai_ret))));
+ pfree(portstr);
+ return NIL;
}
+ pfree(portstr);
+
+ for (walk = res; walk != NULL; walk = walk->ai_next)
+ n++;
- if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1)
+ if (n == 0)
{
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\"")));
+ ereport(ERROR, (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("getaddrinfo() result is empty: no sockets can be created because no available local address with port:%d", pool_config->wd_heartbeat_port)));
+ return NULL;
+ }
+ else
+ {
+ target_n = n;
+ n = 0;
}
- if (hb_if->if_name[0] != '\0')
+
+ for (walk = res; walk != NULL; walk = walk->ai_next)
{
-#if defined(SO_BINDTODEVICE)
+ /* create socket */
+ if ((sock = socket(walk->ai_family, walk->ai_socktype, walk->ai_protocol)) < 0)
{
- if (geteuid() == 0) /* check root privileges */
- {
- struct ifreq i;
+ /* socket create failed */
+ ereport(ERROR,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("create socket failed with reason: \"%m\"")));
+ }
- strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name));
+ if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof(one)) == -1)
+ {
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("setsockopt(SO_REUSEADDR) failed with reason: \"%m\"")));
+ }
+ if (walk->ai_family == AF_INET6)
+ {
+ if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one)) == -1)
+ {
+ ereport(ERROR,
+ (errmsg("failed to set IPPROTO_IPV6 option to watchdog heartbeat recv socket"),
+ errdetail("setsockopt(IPV6_V6ONLY) failed with reason: \"%m\"")));
+ }
+ }
- if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1)
+ if (hb_if->if_name[0] != '\0')
+ {
+#if defined(SO_BINDTODEVICE)
+ {
+ if (geteuid() == 0) /* check root privileges */
{
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\"")));
- }
- ereport(LOG,
- (errmsg("creating watchdog heartbeat receive socket."),
- errdetail("bind receive socket to device: \"%s\"", i.ifr_name)));
+ struct ifreq i;
+
+ strlcpy(i.ifr_name, hb_if->if_name, sizeof(i.ifr_name));
+
+ if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &i, sizeof(i)) == -1)
+ {
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("setsockopt(SO_BINDTODEVICE) failed with reason: \"%m\"")));
+ }
+ ereport(LOG,
+ (errmsg("creating watchdog heartbeat receive socket."),
+ errdetail("bind receive socket to device: \"%s\"", i.ifr_name)));
+ }
+ else
+ ereport(LOG,
+ (errmsg("failed to create watchdog heartbeat receive socket."),
+ errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege")));
}
- else
- ereport(LOG,
- (errmsg("failed to create watchdog heartbeat receive socket."),
- errdetail("setsockopt(SO_BINDTODEVICE) requires root privilege")));
- }
#else
- ereport(LOG,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform")));
+ ereport(LOG,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("setsockopt(SO_BINDTODEVICE) is not available on this platform")));
#endif
- }
+ }
- wd_set_reuseport(sock);
+ wd_set_reuseport(sock);
- ereport(LOG,
- (errmsg("creating watchdog heartbeat receive socket."),
- errdetail("set SO_REUSEPORT")));
+ switch (walk->ai_family)
+ {
+ case AF_INET:
+ inet_ntop(AF_INET, &((struct sockaddr_in *) walk->ai_addr)->sin_addr, buf, walk->ai_addrlen);
+ break;
+ case AF_INET6:
+ inet_ntop(AF_INET6, &((struct sockaddr_in6 *) walk->ai_addr)->sin6_addr, buf, walk->ai_addrlen);
+ break;
+ default:
+ ereport(ERROR, (errmsg("invalid incoming socket family data")));
+ break;
+ }
+ ereport(LOG,
+ (errmsg("creating watchdog heartbeat receive socket."),
+ errdetail("creating socket on %s:%d", buf, pool_config->wd_heartbeat_port)));
- bind_is_done = 0;
- for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++)
- {
- if (bind(sock, (struct sockaddr *) &addr, sizeof(struct sockaddr)) < 0)
+ bind_is_done = 0;
+ for (bind_tries = 0; !bind_is_done && bind_tries < MAX_BIND_TRIES; bind_tries++)
{
- ereport(LOG,
- (errmsg("failed to create watchdog heartbeat receive socket. retrying..."),
- errdetail("bind failed with reason: \"%m\"")));
+ if (bind(sock, walk->ai_addr, walk->ai_addrlen) < 0)
+ {
+ ereport(LOG,
+ (errmsg("failed to create watchdog heartbeat receive socket. retrying..."),
+ errdetail("bind failed with reason: \"%m\"")));
- sleep(1);
+ sleep(1);
+ }
+ else
+ {
+ bind_is_done = 1;
+ }
}
- else
+ /* bind failed finally */
+ if (!bind_is_done)
{
- bind_is_done = 1;
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("bind socket failed with reason: \"%m\"")));
+ continue;
}
- }
- /* bind failed finally */
- if (!bind_is_done)
- {
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("bind socket failed with reason: \"%m\"")));
- }
+ if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0)
+ {
+ close(sock);
+ ereport(ERROR,
+ (errmsg("failed to create watchdog heartbeat receive socket"),
+ errdetail("setting close-on-exec flag failed with reason: \"%m\"")));
+ continue;
+ }
- if (fcntl(sock, F_SETFD, FD_CLOEXEC) < 0)
- {
- close(sock);
- ereport(ERROR,
- (errmsg("failed to create watchdog heartbeat receive socket"),
- errdetail("setting close-on-exec flag failed with reason: \"%m\"")));
+ socks = lappend_int(socks, sock);
+ n++;
}
- return sock;
+ if (target_n != n)
+ ereport(WARNING,
+ (errmsg("failed to create watchdog heartbeat receive socket as much intended"),
+ errdetail("only %d out of %d socket(s) had been created", n, target_n)));
+
+ freeaddrinfo(res);
+ return socks;
}
/* send heartbeat signal */
wd_hb_send(int sock, WdHbPacket * pkt, int len, const char *host, const int port)
{
int rtn;
- struct sockaddr_in addr;
- struct hostent *hp;
WdHbPacket buf;
+ char *portstr;
+ struct addrinfo hints,
+ *res = NULL;
+
+ portstr = psprintf("%d", port);
+
+ memset(&hints, 0x00, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_DGRAM;
+ hints.ai_protocol = 0;
+ hints.ai_flags = AI_NUMERICSERV;
if (!host || !strlen(host))
ereport(ERROR,
(errmsg("failed to send watchdog heartbeat. host name is empty")));
- hp = gethostbyname(host);
- if ((hp == NULL) || (hp->h_addrtype != AF_INET))
- ereport(ERROR,
- (errmsg("failed to send watchdog heartbeat, gethostbyname() failed"),
- errdetail("gethostbyname on \"%s\" failed with reason: \"%s\"", host, hstrerror(h_errno))));
-
- memmove((char *) &(addr.sin_addr), (char *) hp->h_addr, hp->h_length);
-
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
+ if ((rtn = getaddrinfo(host, portstr, &hints, &res)) != 0)
+ {
+ /* Perhaps wrong hostname or IP */
+ ereport(WARNING,
+ (errmsg("failed to get address from watchdog heartbeat hostname"),
+ errdetail("getaddrinfo() failed: %s", gai_strerror(rtn))));
+ }
+ pfree(portstr);
hton_wd_hb_packet(&buf, pkt);
if ((rtn = sendto(sock, &buf, sizeof(WdHbPacket), 0,
- (struct sockaddr *) &addr, sizeof(addr))) != len)
+ res->ai_addr, res->ai_addrlen)) != len)
{
ereport(ERROR,
(errmsg("failed to send watchdog heartbeat, sendto failed"),
ereport(DEBUG2,
(errmsg("watchdog heartbeat: send %d byte packet", rtn)));
+ freeaddrinfo(res);
}
/*
wd_hb_recv(int sock, WdHbPacket * pkt, char *from_addr)
{
int rtn;
- struct sockaddr_in senderinfo;
- socklen_t addrlen;
WdHbPacket buf;
+ struct sockaddr_storage ss;
+ socklen_t addrlen = sizeof(ss);
- addrlen = sizeof(senderinfo);
-
- rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0,
- (struct sockaddr *) &senderinfo, &addrlen);
+ rtn = recvfrom(sock, &buf, sizeof(WdHbPacket), 0, (struct sockaddr *) &ss, &addrlen);
if (rtn < 0)
ereport(ERROR,
- (errmsg("failed to receive heartbeat packet")));
+ (errmsg("failed to receive heartbeat packet with reason %m")));
else if (rtn == 0)
ereport(ERROR,
(errmsg("failed to receive heartbeat received zero length packet")));
ereport(DEBUG2,
(errmsg("watchdog heartbeat: received %d byte packet", rtn)));
- strncpy(from_addr, inet_ntoa(senderinfo.sin_addr), WD_MAX_HOST_NAMELEN - 1);
+
+ switch (ss.ss_family)
+ {
+ case AF_INET:
+ inet_ntop(AF_INET, &((struct sockaddr_in *) &ss)->sin_addr, from_addr, addrlen);
+ break;
+ case AF_INET6:
+ inet_ntop(AF_INET6, &((struct sockaddr_in6 *) &ss)->sin6_addr, from_addr, addrlen);
+ break;
+ default:
+ ereport(ERROR,
+ (errmsg("invalid socket address family:%d", ss.ss_family)));
+ break;
+ }
+
+ ereport(DEBUG2,
+ (errmsg("watchdog heartbeat: received %d byte packet", rtn)));
ntoh_wd_hb_packet(pkt, &buf);
}
char pack_str[WD_MAX_PACKET_STRING];
int pack_str_len;
sigjmp_buf local_sigjmp_buf;
+ List *wd_hb_recv_socks = NIL;
pid = fork();
if (pid != 0)
MemoryContextSwitchTo(TopMemoryContext);
- sock = wd_create_hb_recv_socket(hb_if);
+ wd_hb_recv_socks = wd_create_hb_recv_socket(hb_if);
set_ps_display("heartbeat receiver", false);
MemoryContextSwitchTo(ProcessLoopContext);
MemoryContextResetAndDeleteChildren(ProcessLoopContext);
+ /* get readable socket from heartbeat socket list */
+ if ((sock = select_socket_from_list(wd_hb_recv_socks, pool_config->wd_heartbeat_deadtime)) < 0)
+ {
+ ereport(ERROR, (errmsg("failed to get heartbeat from heartbeat receiver"),
+ errdetail("select() failed on heartbeat receiver")));
+ continue;
+ }
+
/* receive heartbeat signal */
wd_hb_recv(sock, &pkt, from);
/* authentication */
MemoryContextSwitchTo(TopMemoryContext);
- sock = wd_create_hb_send_socket(hb_if);
+ if ((sock = wd_create_hb_send_socket(hb_if)) < 0)
+ {
+ ereport(ERROR,
+ (errmsg("error on creating heartbeat send socket")));
+
+ }
set_ps_display("heartbeat sender", false);