summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMuhammad Usama2015-10-29 17:33:35 +0000
committerMuhammad Usama2015-10-29 17:33:35 +0000
commit5634842883ba2bece531a6fd4991825615389057 (patch)
tree53fd255af88ba1f6f1d5c85c0f11d9e6277d85d6
parentf97ded01c47f7f5bbe918e611d1da55dc4e65ddc (diff)
removing the debug printfs, getting ready to merge the feature into master branchwatchdog_enhancement
-rw-r--r--src/watchdog/watchdog.c346
-rw-r--r--src/watchdog/wd_commands.c84
-rw-r--r--src/watchdog/wd_lifecheck.c17
3 files changed, 227 insertions, 220 deletions
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c
index cf8786b15..7265d30c9 100644
--- a/src/watchdog/watchdog.c
+++ b/src/watchdog/watchdog.c
@@ -318,7 +318,7 @@ static bool write_packet_to_socket(int sock, WDPacketData* pkt);
static int read_sockets(fd_set* rmask,int pending_fds_count);
static void set_timeout(unsigned int sec);
static int wd_create_command_server_socket(void);
-static void close_socket_connection(SocketConnection* conn,int line);
+static void close_socket_connection(SocketConnection* conn);
static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt);
static int send_message(WatchdogNode* wdNode, WDPacketData *pkt);
@@ -395,7 +395,7 @@ static void wd_check_config(void);
static pid_t watchdog_main(void);
static pid_t fork_watchdog_child(void);
-static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode);
+static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode);
/* global variables */
wd_cluster g_cluster;
struct timeval g_tm_set_time;
@@ -784,7 +784,8 @@ static pid_t fork_watchdog_child(void)
else if (pid == -1)
{
ereport(FATAL,
- (errmsg("fork() failed. reason: %s", strerror(errno))));
+ (return_code(POOL_EXIT_FATAL),
+ errmsg("fork() failed. reason: %s", strerror(errno))));
}
return pid;
@@ -926,8 +927,9 @@ wd_create_command_server_socket(void)
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
{
/* socket create failed */
- ereport(ERROR,
- (errmsg("failed to create watchdog command server socket"),
+ ereport(FATAL,
+ (return_code(POOL_EXIT_FATAL),
+ errmsg("failed to create watchdog command server socket"),
errdetail("create socket failed with reason: \"%s\"", strerror(errno))));
}
memset((char *) &addr, 0, sizeof(addr));
@@ -940,8 +942,9 @@ wd_create_command_server_socket(void)
{
close(sock);
unlink(addr.sun_path);
- ereport(ERROR,
- (errmsg("failed to create watchdog command server socket"),
+ ereport(FATAL,
+ (return_code(POOL_EXIT_FATAL),
+ errmsg("failed to create watchdog command server socket"),
errdetail("bind on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno))));
}
@@ -950,8 +953,9 @@ wd_create_command_server_socket(void)
/* listen failed */
close(sock);
unlink(addr.sun_path);
- ereport(ERROR,
- (errmsg("failed to create watchdog command server socket"),
+ ereport(FATAL,
+ (return_code(POOL_EXIT_FATAL),
+ errmsg("failed to create watchdog command server socket"),
errdetail("listen failed with reason: \"%s\"", strerror(errno))));
}
on_proc_exit(FileUnlink, (Datum) pstrdup(addr.sun_path));
@@ -1151,7 +1155,7 @@ static int read_sockets(fd_set* rmask,int pending_fds_count)
{
/* We have found the match */
found = true;
- close_socket_connection(&wdNode->server_socket,__LINE__);
+ close_socket_connection(&wdNode->server_socket);
strlcpy(wdNode->delegate_ip, tempNode->delegate_ip, WD_MAX_HOST_NAMELEN);
strlcpy(wdNode->nodeName, tempNode->nodeName, WD_MAX_HOST_NAMELEN);
wdNode->state = tempNode->state;
@@ -1193,7 +1197,7 @@ static int read_sockets(fd_set* rmask,int pending_fds_count)
tmpNode.server_socket.sock = -1;
tmpNode.server_socket.sock_state = WD_SOCK_UNINITIALIZED;
reply_with_minimal_message(&tmpNode, WD_REJECT_MESSAGE, pkt);
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
}
pfree(tempNode);
}
@@ -1332,14 +1336,15 @@ static bool read_ipc_command_and_process(int sock, bool *remove_socket)
return false;
}
/* Next is is command action */
- ret = socket_read(sock, &command_action, sizeof(WD_COMMAND_ACTIONS),0);
- if (ret != sizeof(WD_COMMAND_ACTIONS))
+ ret = socket_read(sock, &command_action, sizeof(int),0);
+ if (ret != sizeof(int))
{
ereport(WARNING,
(errmsg("error reading from IPC socket"),
errdetail("read from socket failed with error \"%s\"",strerror(errno))));
return false;
}
+ command_action = htonl(command_action);
/* We should have data length */
ret = socket_read(sock, &data_len, sizeof(int),0);
if (ret != sizeof(int))
@@ -1690,10 +1695,11 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman
* if cluster or myself is not in stable state
* just return cluster in transaction
*/
+ ereport(LOG,
+ (errmsg("processing lock request from IPC socket")));
IPCCommand->type = WD_INTERLOCKING_REQUEST;
if (get_local_node_state() == WD_STANDBY)
{
- printf("\t\t\t %s:%d I AM STANDBY \n",__FUNCTION__,__LINE__);
/* I am a standby node, Just forward the request to coordinator */
WDPacketData * wdPacket = get_minimum_message(WD_INTERLOCKING_REQUEST,NULL);
@@ -1702,8 +1708,10 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman
if (send_message(g_cluster.masterNode, wdPacket) <= 0)
{
- printf("\t\t\t %s:%d send message failed \n",__FUNCTION__,__LINE__);
-
+ ereport(LOG,
+ (errmsg("failed to process lock request from IPC socket"),
+ errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
+
/* we have failed to send to any node, return lock failed */
res_type = WD_IPC_CMD_RESULT_BAD;
}
@@ -1712,22 +1720,20 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman
/*
* we need to wait for the result
*/
- printf("\t\t\t %s:%d PROCESSING \n",__FUNCTION__,__LINE__);
-
+ ereport(LOG,
+ (errmsg("lock request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+ errdetail("waiting for the reply from master node...")));
+
res_type = IPC_CMD_PROCESSING;
}
}
else if (get_local_node_state() == WD_COORDINATOR)
{
- printf("\t\t\t %s:%d COORDINATOR \n",__FUNCTION__,__LINE__);
-
/*
* If I am coordinator, Just process the request locally
*/
if (node_has_requested_for_interlocking(g_cluster.localNode, NULL))
{
- printf("\t\t\t %s:%d \n",__FUNCTION__,__LINE__);
-
res_type = WD_IPC_CMD_RESULT_OK;
}
else
@@ -1761,6 +1767,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat
* if cluster or myself is not in stable state
* just return cluster in transaction
*/
+ ereport(LOG,
+ (errmsg("processing sync request from IPC socket")));
+
IPCCommand->type = WD_FAILOVER_CMD_SYNC_REQUEST;
if (get_local_node_state() == WD_STANDBY)
{
@@ -1773,12 +1782,11 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat
set_message_data(&wdPacket, IPCCommand->data_buf , IPCCommand->data_len);
/* save the command ID */
IPCCommand->internal_command_id = wdPacket.command_id;
-
- printf("\t\t\t %s:%d I AM STANDBY \n",__FUNCTION__,__LINE__);
-
if (send_message(g_cluster.masterNode, &wdPacket) <= 0)
{
- printf("\t\t\t %s:%d send message failed \n",__FUNCTION__,__LINE__);
+ ereport(LOG,
+ (errmsg("failed to process sync request from IPC socket"),
+ errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName)));
/* we have failed to send to any node, return lock failed */
res_type = WD_IPC_CMD_RESULT_BAD;
}
@@ -1787,14 +1795,15 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat
/*
* we need to wait for the result
*/
- printf("\t\t\t %s:%d PROCESSING \n",__FUNCTION__,__LINE__);
+ ereport(LOG,
+ (errmsg("sync request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName),
+ errdetail("waiting for the reply from master node...")));
+
res_type = IPC_CMD_PROCESSING;
}
}
else if (get_local_node_state() == WD_COORDINATOR)
{
- printf("\t\t\t %s:%d COORDINATOR \n",__FUNCTION__,__LINE__);
-
/*
* If I am coordinator, Just process the request locally
*/
@@ -1827,13 +1836,9 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat
/* only coordinator(master) node can process this request */
if (get_local_node_state() == WD_COORDINATOR)
{
- printf("\n\t\t\t %s:%d I AM COORDINATOR \n",__FUNCTION__,__LINE__);
-
/* check if we already have no lockholder node */
if (g_cluster.lockHolderNode == NULL || g_cluster.lockHolderNode == wdNode)
{
- printf("\n\t\t\t %s:%d LOCK REQUESTED IS NULL OR FROM SAME NODE \n",__FUNCTION__,__LINE__);
-
if (wdNode == g_cluster.localNode)
{
g_cluster.lockHolderNode = wdNode;
@@ -1843,7 +1848,6 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat
/* reply the node with success message */
else if (reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt))
{
- printf("\n\t\t\t %s:%d WD_ACCEPT_MESSAGE \n",__FUNCTION__,__LINE__);
g_cluster.lockHolderNode = wdNode;
/* TODO inform all cluster about the new lock holder */
return true;
@@ -1851,13 +1855,11 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat
}
else
{
- printf("\n\t\t\t %s:%d WD_REJECT_MESSAGE \n",__FUNCTION__,__LINE__);
reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt);
}
}
else
{
- printf("\n\t\t\t %s:%d WD_ERROR_MESSAGE \n",__FUNCTION__,__LINE__);
reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt);
}
return false;
@@ -1894,7 +1896,7 @@ static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacke
/* The root node must be object */
if (root == NULL || root->type != json_object)
{
- ereport(NOTICE,
+ ereport(LOG,
(errmsg("unable to parse json data from replicate command")));
res = FAILOVER_RES_ERROR;
}
@@ -1903,7 +1905,7 @@ static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacke
if (syncRequestType == NULL)
{
- ereport(NOTICE,
+ ereport(LOG,
(errmsg("invalid json data"),
errdetail("unable to find Watchdog Function Name")));
res = FAILOVER_RES_ERROR;
@@ -1991,6 +1993,11 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
/* only coordinator(master) node can process this request */
+ ereport(LOG,
+ (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start",
+ wdNode->nodeName,
+ check?"check":"aquire")));
+
if (get_local_node_state() == WD_COORDINATOR)
{
InterlockingNode* lockingNode = NULL;
@@ -2004,7 +2011,10 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i
/* check if we already have no lockholder node */
if (lockingNode->lockHolderNode == NULL || lockingNode->lockHolderNode == wdNode)
{
- printf("\n\t\t\t %s:%d LOCK REQUESTED IS NULL OR FROM SAME NODE \n",__FUNCTION__,__LINE__);
+ ereport(LOG,
+ (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start",
+ wdNode->nodeName,
+ check?"check":"aquire")));
if (check == false)
{
lockingNode->lockHolderNode = wdNode;
@@ -2014,7 +2024,11 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i
}
else /* some other node is holding the lock */
{
- printf("\n\t\t\t %s:%d Some other node is already holding the lock \n",__FUNCTION__,__LINE__);
+ ereport(LOG,
+ (errmsg("%s lock for failover command start request is denied to node \"%s\"",
+ check?"check":"aquire",
+ wdNode->nodeName),
+ errdetail("node \"%s\" is holding the lock",lockingNode->lockHolderNode->nodeName)));
if (lockingNode->locked)
res = FAILOVER_RES_BLOCKED;
else
@@ -2024,7 +2038,10 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i
}
else
{
- printf("\n\t\t\t %s:%d I am not in position \n",__FUNCTION__,__LINE__);
+ ereport(LOG,
+ (errmsg("failed to process failover command start request by watchdog node \"%s\"",
+ check?"check":"aquire"),
+ errdetail("I am standby node and request can only be handled by master node")));
res = FAILOVER_RES_ERROR;
}
return res;
@@ -2035,6 +2052,11 @@ node_is_asking_for_failover_cmd_end(WatchdogNode* wdNode, WDPacketData* pkt, int
{
WDFailoverCMDResults res = FAILOVER_RES_TRANSITION;
/* only coordinator(master) node can process this request */
+ ereport(LOG,
+ (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start",
+ wdNode->nodeName,
+ resign?"check":"release")));
+
if (get_local_node_state() == WD_COORDINATOR)
{
InterlockingNode* lockingNode = NULL;
@@ -2056,12 +2078,23 @@ node_is_asking_for_failover_cmd_end(WatchdogNode* wdNode, WDPacketData* pkt, int
}
else /* some other node is holding the lock */
{
+ ereport(LOG,
+ (errmsg("%s lock for failover command end request is denied to node \"%s\"",
+ resign?"check":"release",
+ wdNode->nodeName),
+ errdetail("node \"%s\" is holding the lock",lockingNode->lockHolderNode->nodeName)));
+
res = FAILOVER_RES_BLOCKED;
}
}
}
else
{
+ ereport(LOG,
+ (errmsg("failed to process failover command end request by watchdog node \"%s\"",
+ resign?"check":"release"),
+ errdetail("I am standby node and request can only be handled by master node")));
+
res = FAILOVER_RES_ERROR;
}
return res;
@@ -2128,45 +2161,42 @@ static WDPacketData* read_packet_of_type(SocketConnection* conn, char ensure_typ
ret = socket_read(conn->sock,&type, sizeof(char), 1 );
if (ret != sizeof(char))
{
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
return NULL;
}
ereport(DEBUG1,
- (errmsg("PACKET TYPE %c while need packet type %c",type,ensure_type)));
+ (errmsg("received packet tyep %c while need packet type %c",type,ensure_type)));
if (ensure_type != WD_NO_MESSAGE && ensure_type != type)
{
/* The packet type is not what we want.*/
ereport(DEBUG1,
(errmsg("invalid packet type. expecting %c while received %c",ensure_type,type)));
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
return NULL;
}
ret = socket_read(conn->sock, &cmd_id, sizeof(int) ,1);
if (ret != sizeof(int))
{
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
return NULL;
}
cmd_id = ntohl(cmd_id);
-
+
ereport(DEBUG3,
- (errmsg("PACKET COMMAND ID %d",cmd_id)));
+ (errmsg("received packet with command id %d from watchdog node ",cmd_id)));
ret = socket_read(conn->sock, &len, sizeof(int), 1);
if (ret != sizeof(int))
{
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
return NULL;
}
len = ntohl(len);
- ereport(DEBUG2,
- (errmsg("PACKET DATA LENGTH %d",len)));
-
pkt = get_empty_packet();
set_message_type(pkt, type);
set_message_commandID(pkt, cmd_id);
@@ -2176,7 +2206,7 @@ static WDPacketData* read_packet_of_type(SocketConnection* conn, char ensure_typ
ret = socket_read(conn->sock, buf, len,1);
if (ret != len)
{
- close_socket_connection(conn,__LINE__);
+ close_socket_connection(conn);
free_packet(pkt);
pfree(buf);
return NULL;
@@ -2220,26 +2250,25 @@ static void wd_system_will_go_down(int code, Datum arg)
{
int i;
ereport(LOG,
- (errmsg("Watchdog child is shutting down")));
+ (errmsg("Watchdog is shutting down")));
send_cluster_command(NULL, WD_INFORM_I_AM_GOING_DOWN, 0);
if (get_local_node_state() == WD_COORDINATOR)
resign_from_coordinator();
/* close server socket */
- close_socket_connection(&g_cluster.localNode->server_socket,__LINE__);
+ close_socket_connection(&g_cluster.localNode->server_socket);
/* close all node sockets */
for (i=0; i< g_cluster.remoteNodeCount; i++)
{
WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]);
- close_socket_connection(&wdNode->client_socket,__LINE__);
- close_socket_connection(&wdNode->server_socket,__LINE__);
+ close_socket_connection(&wdNode->client_socket);
+ close_socket_connection(&wdNode->server_socket);
}
}
-static void close_socket_connection(SocketConnection* conn,int line)
+static void close_socket_connection(SocketConnection* conn)
{
- printf("Closing socket %d From %d\n",conn->sock,line);
if ((conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED)
|| conn->sock_state == WD_SOCK_WAITING_FOR_CONNECT)
{
@@ -2361,7 +2390,7 @@ static int update_successful_outgoing_cons(fd_set* wmask, int pending_fds_count)
ereport(LOG,
(errmsg("error in outbond connection to %s:%d",wdNode->hostname,wdNode->wd_port),
errdetail("%s",strerror(valopt))));
- close_socket_connection(&wdNode->client_socket,__LINE__);
+ close_socket_connection(&wdNode->client_socket);
wdNode->client_socket.sock_state = WD_SOCK_ERROR;
}
else
@@ -2684,7 +2713,6 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
POOL_CONFIG* master_config = get_pool_config_from_json(pkt->data, pkt->len);
if (master_config)
{
- printf("\n%s\n",pkt->data);
verify_pool_configurations(master_config);
}
@@ -2709,7 +2737,6 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
wdNode->tv.tv_sec = tempNode->tv.tv_sec;
wdNode->wd_priority = tempNode->wd_priority;
- printf("NODE INFO MESSAGE RECEVD\n");
print_watchdog_node_info(wdNode);
if (authkey)
@@ -2730,12 +2757,10 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
break;
case WD_INTERLOCKING_REQUEST:
- printf("\n\t\t\t %s:%d INTERLOCKING REQUEST RECEIVED \n",__FUNCTION__,__LINE__);
node_has_requested_for_interlocking(wdNode, pkt);
break;
case WD_INTERUNLOCKING_REQUEST:
- printf("\n\t\t\t %s:%d UNLOCKING_REQUEST REQUEST RECEIVED \n",__FUNCTION__,__LINE__);
node_has_resigned_from_interlocking(wdNode, pkt);
break;
@@ -2783,18 +2808,13 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt)
static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt)
{
- printf("sending to sock:%d state:%d \n",conn->sock,conn->sock_state);
-
if (conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED)
{
if (write_packet_to_socket(conn->sock, pkt) == true)
- {
return true;
- }
- else
- printf("Write failed while sending to sock:%d state:%d \n",conn->sock,conn->sock_state);
-
- close_socket_connection(conn,__LINE__);
+ ereport(DEBUG1,
+ (errmsg("sending packet failed, closing connection")));
+ close_socket_connection(conn);
}
return false;
@@ -2802,13 +2822,12 @@ static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt
static bool send_message_to_node(WatchdogNode* wdNode, WDPacketData *pkt)
{
- printf("send message to client socket of %s node \n",wdNode->nodeName);
-
if (send_message_to_connection(&wdNode->client_socket,pkt) == true)
return true;
- printf("send message to server socket of %s node \n",wdNode->nodeName);
if (send_message_to_connection(&wdNode->server_socket,pkt) == true)
return true;
+ ereport(DEBUG1,
+ (errmsg("sending packet to node \"%s\" failed, closing connection", wdNode->nodeName)));
return false;
}
@@ -2905,8 +2924,6 @@ static bool watchdog_internal_command_packet_processor(WatchdogNode* wdNode, WDP
nodeResult->cmdState = COMMAND_STATE_REPLIED;
g_cluster.currentCommand.commandReplyFromCount++;
- printf("----*****----- [%d] reply_from_count = %d AND sendTo_count = %d\n",__LINE__,g_cluster.currentCommand.commandReplyFromCount,g_cluster.currentCommand.commandSendToCount);
-
if (g_cluster.currentCommand.commandReplyFromCount >= g_cluster.currentCommand.commandSendToCount)
{
g_cluster.currentCommand.commandFinished = true;
@@ -3025,7 +3042,8 @@ static int issue_watchdog_internal_command(WatchdogNode* wdNode, WDPacketData *p
clear_command_node_result(nodeResult);
if (nodeResult->wdNode->state == WD_DEAD || nodeResult->wdNode->state == WD_SHUTDOWN)
{
- printf(" Not sending to DEAD node:%d name = [%s]\n",i,nodeResult->wdNode->nodeName);
+ ereport(DEBUG2,
+ (errmsg("not sending watchdog internal command packet to DEAD %s",nodeResult->wdNode->nodeName)));
/* Do not send to dead nodes */
nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND;
}
@@ -3033,7 +3051,9 @@ static int issue_watchdog_internal_command(WatchdogNode* wdNode, WDPacketData *p
{
if (send_message_to_node(nodeResult->wdNode, pkt) == false)
{
- printf(" sending to node:%d name = [%s] FAILED \n",i,nodeResult->wdNode->nodeName);
+ ereport(DEBUG1,
+ (errmsg("failed to send watchdog internla command packet %s",nodeResult->wdNode->nodeName),
+ errdetail("saving the packet. will try to resend it if connection recovers")));
/* failed to send. May be try again later */
save_message = true;
@@ -3266,14 +3286,12 @@ static bool wd_commands_packet_processor(WD_EVENTS event, WatchdogNode* wdNode,
if (pkt->type == WD_INTERLOCKING_REQUEST)
{
- printf("\n\t\t\t %s:%d INTERLOCKING REQUEST RECEIVED \n",__FUNCTION__,__LINE__);
node_has_requested_for_interlocking(wdNode, pkt);
return true;
}
if (pkt->type == WD_INTERUNLOCKING_REQUEST)
{
- printf("\n\t\t\t %s:%d UNLOCKING_REQUEST REQUEST RECEIVED \n",__FUNCTION__,__LINE__);
node_has_resigned_from_interlocking(wdNode, pkt);
return true;
}
@@ -3365,7 +3383,7 @@ static int watchdog_state_machine(WD_EVENTS event, WatchdogNode* wdNode, WDPacke
}
else if (event == WD_EVENT_PACKET_RCV)
{
- print_packet_info(pkt,wdNode);
+ print_received_packet_info(pkt,wdNode);
if (pkt->type == WD_INFO_MESSAGE)
standard_packet_processor(wdNode, pkt);
@@ -3800,13 +3818,14 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
int i;
send_cluster_command(NULL, WD_DECLARE_COORDINATOR_MESSAGE, 5);
set_timeout(10);
- printf("\nI AM SELECTING MYSELF AS COORDINATOR NODE\n");
+ ereport(LOG,
+ (errmsg("I am announcing my self as master/coordinator watchdog node")));
for (i=0; i< g_cluster.remoteNodeCount; i++)
{
WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]);
- printf("NODE INFORMATION for NODE %d\n",i);
+ ereport(DEBUG2,
+ (errmsg("printing all remote node information")));
print_watchdog_node_info(wdNode);
- printf("___________\n");
}
}
break;
@@ -3819,6 +3838,16 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
{
ereport(LOG,
+ (errmsg("declare coordinator command finished with status:[%s]",
+ g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED?
+ "ALL NODES REPLIED":
+ "COMMAND TIMEED OUT"),
+ errdetail("The command was sent to %d nodes and %d nodes replied to it",
+ g_cluster.currentCommand.commandSendToCount,
+ g_cluster.currentCommand.commandReplyFromCount
+ )));
+
+ ereport(LOG,
(errmsg("I am the cluster leader node. Starting escalation process"),
errdetail("our declare coordinator message is accepted by all nodes")));
@@ -3839,20 +3868,30 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN
}
}
- if (g_cluster.currentCommand.packet.type == WD_IAM_COORDINATOR_MESSAGE)
+ else if (g_cluster.currentCommand.packet.type == WD_IAM_COORDINATOR_MESSAGE)
{
- if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED ||
- g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
+ if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED)
+ {
+ ereport(DEBUG1,
+ (errmsg("I am the cluster leader node command finished with status:[ALL NODES REPLIED]"),
+ errdetail("The command was sent to %d nodes and %d nodes replied to it",
+ g_cluster.currentCommand.commandSendToCount,
+ g_cluster.currentCommand.commandReplyFromCount
+ )));
+ }
+ else if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT)
{
- printf("\n**************I AM COORDINATOR MESSAGE ACCEPTED**************\n");
- printf("Send to count = %d\n",g_cluster.currentCommand.commandSendToCount);
- printf("Reply from count = %d\n",g_cluster.currentCommand.commandReplyFromCount);
- printf("commandStatus = %d\n",g_cluster.currentCommand.commandStatus);
+ ereport(DEBUG1,
+ (errmsg("I am the cluster leader node command finished with status:[COMMAND TIMEED OUT] which is success"),
+ errdetail("The command was sent to %d nodes and %d nodes replied to it",
+ g_cluster.currentCommand.commandSendToCount,
+ g_cluster.currentCommand.commandReplyFromCount
+ )));
}
else
{
/* command is finished but because of error */
- ereport(NOTICE,
+ ereport(WARNING,
(errmsg("possible split brain scenario detected by \"%s\" node", wdNode->nodeName),
(errdetail("re-initializing cluster"))));
set_state(WD_JOINING);
@@ -4329,17 +4368,16 @@ static int set_state(WD_STATES newState)
{
WD_STATES oldState = g_cluster.localNode->state;
ereport(LOG,
- (errmsg("setting watchdog state old state = %s NEW = %s",debug_states[oldState],debug_states[newState])));
+ (errmsg("watchdog state changed from [%s] to [%s]",debug_states[oldState],debug_states[newState])));
g_cluster.localNode->state = newState;
/* If we are resigning from being coordinator
* kill the escalation child process
- */
if (oldState == WD_COORDINATOR && newState != WD_COORDINATOR && g_cluster.escalation_pid > 0)
{
- printf(" I am resigning from coordinator so killing pid =%d\n",g_cluster.escalation_pid);
kill(g_cluster.escalation_pid,SIGTERM);
g_cluster.escalation_pid = -1;
}
+ */
if (oldState != newState)
watchdog_state_machine(WD_EVENT_WD_STATE_CHANGED, NULL, NULL);
return 0;
@@ -4420,23 +4458,35 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData*
int node_count = 0;
int *node_id_list = NULL;
+ if (pkt->data == NULL || pkt->len == 0)
+ {
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool replicate command"),
+ errdetail("command packet contains no data")));
+ return false;
+ }
+ ereport(DEBUG1,
+ (errmsg("processing pgpool replicate variable command"),
+ errdetail("%s",pkt->data)));
+
root = json_parse(pkt->data,pkt->len);
/* The root node must be object */
if (root == NULL || root->type != json_object)
{
json_value_free(root);
- ereport(NOTICE,
- (errmsg("unable to parse json data from replicate command")));
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool replicate command"),
+ errdetail("invalid json data \"%s\"",pkt->data)));
return false;
}
func_name = json_get_string_value_for_key(root, "Function");
if (func_name == NULL)
{
json_value_free(root);
- ereport(NOTICE,
- (errmsg("invalid json data"),
- errdetail("unable to find Watchdog Function Name")));
+ ereport(LOG,
+ (errmsg("watchdog is unable to process pgpool replicate command"),
+ errdetail("function node not found in json data \"%s\"",pkt->data)));
return false;
}
func_name = pstrdup(func_name);
@@ -4450,22 +4500,22 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData*
value = json_get_value_for_key(root,"NodeIdList");
if (value == NULL)
{
- ereport(ERROR,
+ ereport(WARNING,
(errmsg("invalid json data"),
errdetail("unable to find NodeIdList node from data")));
}
if (value->type != json_array)
{
is_error = true;
- ereport(NOTICE,
+ ereport(WARNING,
(errmsg("invalid json data"),
errdetail("NodeIdList node does not contains Array")));
}
if (node_count != value->u.array.length)
{
is_error = true;
- ereport(NOTICE,
- (errmsg("invalid json data"),
+ ereport(WARNING,
+ (errmsg("invalid json data"),
errdetail("NodeIdList array contains %d nodes while expecting %d",value->u.array.length, node_count)));
}
@@ -4484,12 +4534,7 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData*
node_count = 0;
}
}
- int k;
json_value_free(root);
- printf("***** NEW WD COMMAND *****\n FUNCTION = \"%s\"\nNode Count = %d\n",func_name, node_count);
- for (k =0; k< node_count; k++)
- printf("NODE ID [%d] = %d\n",k,node_id_list[k]);
- printf("\n");
return process_wd_command_function(wdNode, pkt, func_name, node_count, node_id_list);
}
@@ -4522,10 +4567,10 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
wd_func_command->commandID = pkt->command_id;
wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name);
wd_func_command->wdNode = wdNode;
-
+
/* Add this command for timer tick */
add_wd_command_for_timer_events(pool_config->recovery_timeout, true, wd_func_command);
-
+
MemoryContextSwitchTo(oldCxt);
}
@@ -4587,24 +4632,6 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt,
promote_backend(node_id_list[0]);
}
}
-
- else if (strcasecmp("TEST_SYSTEM", func_name) == 0)
- {
- printf("&&&&&&_____[%d] PROCESSING TEST_SYSTEM COMMAND\n",__LINE__);
- WDFunctionCommandData* wd_func_command;
- MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
-
- wd_func_command = palloc(sizeof(WDFunctionCommandData));
- wd_func_command->commandType = pkt->type;
- wd_func_command->commandID = pkt->command_id;
- wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name);
- wd_func_command->wdNode = wdNode;
-
- /* Add this command for timer tick */
- add_wd_command_for_timer_events(10, true, wd_func_command);
-
- MemoryContextSwitchTo(oldCxt);
- }
else
{
/* This is not supported function */
@@ -4619,7 +4646,9 @@ static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode,
int i;
WDCommandNodeResult* nodeResult = NULL;
/* get the result node for */
- printf("----*****----- [%d] WE HAVE RECEIVED REPLY FOR REPLICATE COMMAND WE ISSUED\n",__LINE__);
+ ereport(DEBUG1,
+ (errmsg("watchdog node \"%s\" has replied for pgpool-II replicate command packet",wdNode->nodeName)));
+
for (i=0; i< g_cluster.remoteNodeCount; i++)
{
nodeResult = &ipcCommand->nodeResults[i];
@@ -4629,15 +4658,16 @@ static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode,
}
if (nodeResult == NULL)
{
- ereport(NOTICE,(errmsg("unable to find node result")));
+ ereport(WARNING,
+ (errmsg("unable to find result node for pgpool-II replicate command packet received from watchdog node \"%s\"",wdNode->nodeName)));
return true;
}
nodeResult->result_type = pkt->type;
nodeResult->cmdState = COMMAND_STATE_REPLIED;
ipcCommand->reply_from_count++;
-
- printf("----*****----- [%d] reply_from_count = %d AND sendTo_count = %d\n",__LINE__,ipcCommand->reply_from_count,ipcCommand->sendTo_count);
-
+ ereport(DEBUG2,
+ (errmsg("watchdog node \"%s\" has replied for pgpool-II replicate command packet",wdNode->nodeName),
+ errdetail("command was sent to %d nodes and %d nodes have replied to it",ipcCommand->sendTo_count,ipcCommand->reply_from_count)));
if (ipcCommand->reply_from_count >= ipcCommand->sendTo_count)
{
/*
@@ -4693,23 +4723,6 @@ static bool process_wd_command_timer_event(bool timer_expired, WDFunctionCommand
}
return false;
}
-
- if (wd_func_command->funcName && strcasecmp("TEST_SYSTEM", wd_func_command->funcName) == 0)
- {
- if (timer_expired)
- {
- printf("****%s:%d Timer Expired TEST_STSTEM function, Sending back accept message\n",__FUNCTION__,__LINE__);
-
- WDPacketData emptyPkt;
- emptyPkt.command_id = wd_func_command->commandID;
- reply_with_minimal_message(wd_func_command->wdNode, WD_ACCEPT_MESSAGE, &emptyPkt);
- return true;
- }
- else
- printf("****%s:%d Timer tick called on TEST_STSTEM function\n",__FUNCTION__,__LINE__);
- return false;
- }
-
}
/* Just remove the timer.*/
return true;
@@ -4721,11 +4734,11 @@ static void process_wd_func_commands_for_timer_events(void)
ListCell *lc;
List* timers_to_del = NIL;
+ if (g_cluster.wd_timer_commands == NULL)
+ return;
+
gettimeofday(&currTime, NULL);
-
- if (g_cluster.wd_timer_commands != NULL)
- printf("****%s:%d \n",__FUNCTION__,__LINE__);
-
+
foreach(lc, g_cluster.wd_timer_commands)
{
WDCommandTimerData* timerData = lfirst(lc);
@@ -4935,15 +4948,17 @@ static bool verify_authhash_for_node(WatchdogNode* wdNode, char* authhash)
/* DEBUG */
static void print_watchdog_node_info(WatchdogNode* wdNode)
{
- printf("********\t STATE = %s\n",debug_states[wdNode->state]);
- printf("********\t HostName = %s\n",wdNode->hostname);
- printf("********\t NodeName = %s\n",wdNode->nodeName);
- printf("********\t WDPort = %d\n",wdNode->wd_port);
- printf("********\t pgp port = %d\n",wdNode->pgpool_port);
- printf("********\t Priority = %d\n",wdNode->wd_priority);
+ ereport(DEBUG2,
+ (errmsg("state: \"%s\" Host: \"%s\" Name: \"%s\" WD Port:%d PP Port: %d priority:%d",
+ debug_states[wdNode->state],
+ wdNode->hostname
+ ,wdNode->nodeName
+ ,wdNode->wd_port
+ ,wdNode->pgpool_port
+ ,wdNode->wd_priority)));
}
-static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode)
+static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode)
{
int i;
packet_types *pkt_type = NULL;
@@ -4958,6 +4973,9 @@ static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode)
break;
}
}
- printf("\n******Packet [ID:%d]received of type [%s] from \"%s\" My state = (%s) \n",pkt->command_id, pkt_type?pkt_type->name:"UNKNOWN",
- wdNode->nodeName, debug_states[get_local_node_state()]);
+ ereport(DEBUG2,
+ (errmsg("watchdog packet received from node \"%s\"",wdNode->nodeName),
+ errdetail("command id : %d Type: %s my watchdog state :%s",pkt->command_id,
+ pkt_type?pkt_type->name:"UNKNOWN",
+ debug_states[get_local_node_state()])));
}
diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c
index c1d274bb3..cbe70141d 100644
--- a/src/watchdog/wd_commands.c
+++ b/src/watchdog/wd_commands.c
@@ -44,6 +44,7 @@
#include "utils/elog.h"
#include "utils/json_writer.h"
#include "utils/json.h"
+#include "utils/pool_stream.h"
#include "pool_config.h"
#include "watchdog/wd_ipc_commands.h"
#include "watchdog/wd_ipc_defines.h"
@@ -64,7 +65,6 @@ static char* get_wd_node_function_json(char* func_name, int *node_id_set, int co
static char* get_wd_simple_function_json(char* func);
static char* get_wd_failover_cmd_type_json(WDFailoverCMDTypes cmdType, char* reqType);
WDFailoverCMDResults wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType);
-static int read_socket(int socket, void* buf, int len);
static int wd_set_node_mask (unsigned char req_mask, int *node_id_set, int count);
static int wd_chk_node_mask (unsigned char req_mask, int *node_id_set, int count);
@@ -111,30 +111,44 @@ issue_command_to_watchdog(char type, WD_COMMAND_ACTIONS command_action,int timeo
int sock;
WDIPCCmdResult* result = NULL;
char res_type = 'P';
- int res_length;
+ int res_length, action;
gettimeofday(&start_time, NULL);
/* open the watchdog command socket for IPC */
sock = open_wd_command_sock(false);
if (sock < 0)
+ return NULL;
+
+ res_length = htonl(data_len);
+ action = htonl(command_action);
+
+ if (socket_write(sock, &type, sizeof(char)) <= 0)
{
+ close(sock);
return NULL;
}
- if (send(sock,&type,1,0) < 1)
+
+
+ if (socket_write(sock, &action, sizeof(int)) <= 0)
+ {
+ close(sock);
+ return NULL;
+ }
+
+ if (socket_write(sock, &res_length, sizeof(int)) <= 0)
{
close(sock);
return NULL;
}
- /*
- * since the command action will be consumed locally,
- * so no need to convert it to network byte order
- */
- res_length = htonl(data_len);
- send(sock,&command_action,sizeof(command_action),0);
- send(sock,&res_length,sizeof(int),0);
if (data && data_len > 0)
- send(sock,data,data_len,0);
-
+ {
+ if (socket_write(sock, data, data_len) <= 0)
+ {
+ close(sock);
+ return NULL;
+ }
+ }
+
if (blocking)
{
/* if we are asked to wait for results */
@@ -153,40 +167,34 @@ issue_command_to_watchdog(char type, WD_COMMAND_ACTIONS command_action,int timeo
select_res = select(sock+1,&fds,NULL,NULL,timeout_st);
if (select_res > 0)
{
- int ret;
/* read the result type char */
- ret = read_socket(sock, &res_type, 1);
-
- if (ret != 1)
+ if (socket_read(sock, &res_type, 1 ,0) <=0)
{
- ereport(DEBUG1,
- (errmsg("error reading from IPC command socket"),
+ ereport(LOG,
+ (errmsg("error reading from IPC command socket"),
errdetail("read from socket failed with error \"%s\"",strerror(errno))));
close(sock);
return result;
}
/* read the result data length */
- ret = read_socket(sock, &res_length, 4);
- if (ret != 4)
+ if (socket_read(sock, &res_length, sizeof(int), 0) <= 0)
{
- ereport(DEBUG1,
- (errmsg("error reading from IPC command socket"),
+ ereport(LOG,
+ (errmsg("error reading from IPC command socket"),
errdetail("read from socket failed with error \"%s\"",strerror(errno))));
close(sock);
return result;
}
+
result = palloc(sizeof(WDIPCCmdResult));
result->type = res_type;
result->length = ntohl(res_length);
result->data = NULL;
-
+
if (result->length > 0)
{
- int ret;
- /* read the result data length */
result->data = palloc(result->length);
- ret = read_socket(sock, result->data, result->length);
- if (ret != result->length)
+ if (socket_read(sock, result->data, result->length, 0) <= 0)
{
pfree(result->data);
pfree(result);
@@ -248,7 +256,7 @@ wd_start_recovery(void)
if (result == NULL)
{
ereport(LOG,
- (errmsg("start recovery command lock failed"),
+ (errmsg("start recovery command lock failed"),
errdetail("issue command to watchdog returned NULL")));
return COMMAND_FAILED;
}
@@ -258,7 +266,7 @@ wd_start_recovery(void)
if (type == WD_IPC_CMD_CLUSTER_IN_TRAN)
{
ereport(LOG,
- (errmsg("start recovery command lock failed"),
+ (errmsg("start recovery command lock failed"),
errdetail("watchdog cluster is not in stable state"),
errhint("try again when the cluster is fully initialized")));
return CLUSTER_IN_TRANSATIONING;
@@ -600,24 +608,6 @@ open_wd_command_sock(bool throw_error)
return sock;
}
-
-
-static int read_socket(int socket, void* buf, int len)
-{
- int read_len = 0;
- while (read_len < len)
- {
- int nret;
- nret = read(socket, buf + read_len, len - read_len);
- if (nret <= 0)
- return nret;
- read_len +=nret;
- }
- return read_len;
-}
-
-
-
WDFailoverCMDResults wd_failover_command_start(WDFailoverCMDTypes cmdType)
{
if (pool_config->use_watchdog)
diff --git a/src/watchdog/wd_lifecheck.c b/src/watchdog/wd_lifecheck.c
index 0df03278e..88bb748a8 100644
--- a/src/watchdog/wd_lifecheck.c
+++ b/src/watchdog/wd_lifecheck.c
@@ -423,18 +423,17 @@ static void print_lifecheck_cluster(void)
int i;
if (!gslifeCheckCluster)
return;
- printf("Nodes count = %d\n",gslifeCheckCluster->nodeCount);
+ ereport(LOG,
+ (errmsg("%d watchdog nodes are configured for lifecheck",gslifeCheckCluster->nodeCount)));
for (i = 0; i< gslifeCheckCluster->nodeCount; i++)
{
- printf("NODE NO %d\n",i);
- printf("\t ID = %d\n",gslifeCheckCluster->lifeCheckNodes[i].ID);
- printf("\t Name = %s\n",gslifeCheckCluster->lifeCheckNodes[i].nodeName);
- printf("\t Host = %s\n",gslifeCheckCluster->lifeCheckNodes[i].hostName);
- printf("\t WDPort = %d\n",gslifeCheckCluster->lifeCheckNodes[i].wdPort);
- printf("\t pp Port = %d\n",gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort);
- printf("--------------\n");
+ ereport(LOG,
+ (errmsg("watchdog nodes ID:%d Name:\"%s\"",gslifeCheckCluster->lifeCheckNodes[i].ID,gslifeCheckCluster->lifeCheckNodes[i].nodeName),
+ errdetail("Host:\"%s\" WD Port:%d pgpool-II port:%d",
+ gslifeCheckCluster->lifeCheckNodes[i].hostName,
+ gslifeCheckCluster->lifeCheckNodes[i].wdPort,
+ gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort)));
}
- printf("========\n");
}
static bool inform_node_status(LifeCheckNode* node, char *message)