diff options
| author | Muhammad Usama | 2015-10-29 17:33:35 +0000 |
|---|---|---|
| committer | Muhammad Usama | 2015-10-29 17:33:35 +0000 |
| commit | 5634842883ba2bece531a6fd4991825615389057 (patch) | |
| tree | 53fd255af88ba1f6f1d5c85c0f11d9e6277d85d6 | |
| parent | f97ded01c47f7f5bbe918e611d1da55dc4e65ddc (diff) | |
removing the debug printfs, getting ready to merge the feature into master branchwatchdog_enhancement
| -rw-r--r-- | src/watchdog/watchdog.c | 346 | ||||
| -rw-r--r-- | src/watchdog/wd_commands.c | 84 | ||||
| -rw-r--r-- | src/watchdog/wd_lifecheck.c | 17 |
3 files changed, 227 insertions, 220 deletions
diff --git a/src/watchdog/watchdog.c b/src/watchdog/watchdog.c index cf8786b15..7265d30c9 100644 --- a/src/watchdog/watchdog.c +++ b/src/watchdog/watchdog.c @@ -318,7 +318,7 @@ static bool write_packet_to_socket(int sock, WDPacketData* pkt); static int read_sockets(fd_set* rmask,int pending_fds_count); static void set_timeout(unsigned int sec); static int wd_create_command_server_socket(void); -static void close_socket_connection(SocketConnection* conn,int line); +static void close_socket_connection(SocketConnection* conn); static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt); static int send_message(WatchdogNode* wdNode, WDPacketData *pkt); @@ -395,7 +395,7 @@ static void wd_check_config(void); static pid_t watchdog_main(void); static pid_t fork_watchdog_child(void); -static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode); +static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode); /* global variables */ wd_cluster g_cluster; struct timeval g_tm_set_time; @@ -784,7 +784,8 @@ static pid_t fork_watchdog_child(void) else if (pid == -1) { ereport(FATAL, - (errmsg("fork() failed. reason: %s", strerror(errno)))); + (return_code(POOL_EXIT_FATAL), + errmsg("fork() failed. reason: %s", strerror(errno)))); } return pid; @@ -926,8 +927,9 @@ wd_create_command_server_socket(void) if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { /* socket create failed */ - ereport(ERROR, - (errmsg("failed to create watchdog command server socket"), + ereport(FATAL, + (return_code(POOL_EXIT_FATAL), + errmsg("failed to create watchdog command server socket"), errdetail("create socket failed with reason: \"%s\"", strerror(errno)))); } memset((char *) &addr, 0, sizeof(addr)); @@ -940,8 +942,9 @@ wd_create_command_server_socket(void) { close(sock); unlink(addr.sun_path); - ereport(ERROR, - (errmsg("failed to create watchdog command server socket"), + ereport(FATAL, + (return_code(POOL_EXIT_FATAL), + errmsg("failed to create watchdog command server socket"), errdetail("bind on \"%s\" failed with reason: \"%s\"", addr.sun_path, strerror(errno)))); } @@ -950,8 +953,9 @@ wd_create_command_server_socket(void) /* listen failed */ close(sock); unlink(addr.sun_path); - ereport(ERROR, - (errmsg("failed to create watchdog command server socket"), + ereport(FATAL, + (return_code(POOL_EXIT_FATAL), + errmsg("failed to create watchdog command server socket"), errdetail("listen failed with reason: \"%s\"", strerror(errno)))); } on_proc_exit(FileUnlink, (Datum) pstrdup(addr.sun_path)); @@ -1151,7 +1155,7 @@ static int read_sockets(fd_set* rmask,int pending_fds_count) { /* We have found the match */ found = true; - close_socket_connection(&wdNode->server_socket,__LINE__); + close_socket_connection(&wdNode->server_socket); strlcpy(wdNode->delegate_ip, tempNode->delegate_ip, WD_MAX_HOST_NAMELEN); strlcpy(wdNode->nodeName, tempNode->nodeName, WD_MAX_HOST_NAMELEN); wdNode->state = tempNode->state; @@ -1193,7 +1197,7 @@ static int read_sockets(fd_set* rmask,int pending_fds_count) tmpNode.server_socket.sock = -1; tmpNode.server_socket.sock_state = WD_SOCK_UNINITIALIZED; reply_with_minimal_message(&tmpNode, WD_REJECT_MESSAGE, pkt); - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); } pfree(tempNode); } @@ -1332,14 +1336,15 @@ static bool read_ipc_command_and_process(int sock, bool *remove_socket) return false; } /* Next is is command action */ - ret = socket_read(sock, &command_action, sizeof(WD_COMMAND_ACTIONS),0); - if (ret != sizeof(WD_COMMAND_ACTIONS)) + ret = socket_read(sock, &command_action, sizeof(int),0); + if (ret != sizeof(int)) { ereport(WARNING, (errmsg("error reading from IPC socket"), errdetail("read from socket failed with error \"%s\"",strerror(errno)))); return false; } + command_action = htonl(command_action); /* We should have data length */ ret = socket_read(sock, &data_len, sizeof(int),0); if (ret != sizeof(int)) @@ -1690,10 +1695,11 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman * if cluster or myself is not in stable state * just return cluster in transaction */ + ereport(LOG, + (errmsg("processing lock request from IPC socket"))); IPCCommand->type = WD_INTERLOCKING_REQUEST; if (get_local_node_state() == WD_STANDBY) { - printf("\t\t\t %s:%d I AM STANDBY \n",__FUNCTION__,__LINE__); /* I am a standby node, Just forward the request to coordinator */ WDPacketData * wdPacket = get_minimum_message(WD_INTERLOCKING_REQUEST,NULL); @@ -1702,8 +1708,10 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman if (send_message(g_cluster.masterNode, wdPacket) <= 0) { - printf("\t\t\t %s:%d send message failed \n",__FUNCTION__,__LINE__); - + ereport(LOG, + (errmsg("failed to process lock request from IPC socket"), + errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName))); + /* we have failed to send to any node, return lock failed */ res_type = WD_IPC_CMD_RESULT_BAD; } @@ -1712,22 +1720,20 @@ static IPC_CMD_PREOCESS_RES process_IPC_lock_request(WDIPCCommandData *IPCComman /* * we need to wait for the result */ - printf("\t\t\t %s:%d PROCESSING \n",__FUNCTION__,__LINE__); - + ereport(LOG, + (errmsg("lock request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName), + errdetail("waiting for the reply from master node..."))); + res_type = IPC_CMD_PROCESSING; } } else if (get_local_node_state() == WD_COORDINATOR) { - printf("\t\t\t %s:%d COORDINATOR \n",__FUNCTION__,__LINE__); - /* * If I am coordinator, Just process the request locally */ if (node_has_requested_for_interlocking(g_cluster.localNode, NULL)) { - printf("\t\t\t %s:%d \n",__FUNCTION__,__LINE__); - res_type = WD_IPC_CMD_RESULT_OK; } else @@ -1761,6 +1767,9 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat * if cluster or myself is not in stable state * just return cluster in transaction */ + ereport(LOG, + (errmsg("processing sync request from IPC socket"))); + IPCCommand->type = WD_FAILOVER_CMD_SYNC_REQUEST; if (get_local_node_state() == WD_STANDBY) { @@ -1773,12 +1782,11 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat set_message_data(&wdPacket, IPCCommand->data_buf , IPCCommand->data_len); /* save the command ID */ IPCCommand->internal_command_id = wdPacket.command_id; - - printf("\t\t\t %s:%d I AM STANDBY \n",__FUNCTION__,__LINE__); - if (send_message(g_cluster.masterNode, &wdPacket) <= 0) { - printf("\t\t\t %s:%d send message failed \n",__FUNCTION__,__LINE__); + ereport(LOG, + (errmsg("failed to process sync request from IPC socket"), + errdetail("failed to forward the request to master watchdog node \"%s\"",g_cluster.masterNode->nodeName))); /* we have failed to send to any node, return lock failed */ res_type = WD_IPC_CMD_RESULT_BAD; } @@ -1787,14 +1795,15 @@ static IPC_CMD_PREOCESS_RES process_IPC_failover_cmd_synchronise(WDIPCCommandDat /* * we need to wait for the result */ - printf("\t\t\t %s:%d PROCESSING \n",__FUNCTION__,__LINE__); + ereport(LOG, + (errmsg("sync request from IPC socket is forwarded to master watchdog node \"%s\"",g_cluster.masterNode->nodeName), + errdetail("waiting for the reply from master node..."))); + res_type = IPC_CMD_PROCESSING; } } else if (get_local_node_state() == WD_COORDINATOR) { - printf("\t\t\t %s:%d COORDINATOR \n",__FUNCTION__,__LINE__); - /* * If I am coordinator, Just process the request locally */ @@ -1827,13 +1836,9 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat /* only coordinator(master) node can process this request */ if (get_local_node_state() == WD_COORDINATOR) { - printf("\n\t\t\t %s:%d I AM COORDINATOR \n",__FUNCTION__,__LINE__); - /* check if we already have no lockholder node */ if (g_cluster.lockHolderNode == NULL || g_cluster.lockHolderNode == wdNode) { - printf("\n\t\t\t %s:%d LOCK REQUESTED IS NULL OR FROM SAME NODE \n",__FUNCTION__,__LINE__); - if (wdNode == g_cluster.localNode) { g_cluster.lockHolderNode = wdNode; @@ -1843,7 +1848,6 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat /* reply the node with success message */ else if (reply_with_minimal_message(wdNode, WD_ACCEPT_MESSAGE, pkt)) { - printf("\n\t\t\t %s:%d WD_ACCEPT_MESSAGE \n",__FUNCTION__,__LINE__); g_cluster.lockHolderNode = wdNode; /* TODO inform all cluster about the new lock holder */ return true; @@ -1851,13 +1855,11 @@ static int node_has_requested_for_interlocking(WatchdogNode* wdNode, WDPacketDat } else { - printf("\n\t\t\t %s:%d WD_REJECT_MESSAGE \n",__FUNCTION__,__LINE__); reply_with_minimal_message(wdNode, WD_REJECT_MESSAGE, pkt); } } else { - printf("\n\t\t\t %s:%d WD_ERROR_MESSAGE \n",__FUNCTION__,__LINE__); reply_with_minimal_message(wdNode, WD_ERROR_MESSAGE, pkt); } return false; @@ -1894,7 +1896,7 @@ static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacke /* The root node must be object */ if (root == NULL || root->type != json_object) { - ereport(NOTICE, + ereport(LOG, (errmsg("unable to parse json data from replicate command"))); res = FAILOVER_RES_ERROR; } @@ -1903,7 +1905,7 @@ static void process_failover_command_sync_requests(WatchdogNode* wdNode, WDPacke if (syncRequestType == NULL) { - ereport(NOTICE, + ereport(LOG, (errmsg("invalid json data"), errdetail("unable to find Watchdog Function Name"))); res = FAILOVER_RES_ERROR; @@ -1991,6 +1993,11 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i { WDFailoverCMDResults res = FAILOVER_RES_TRANSITION; /* only coordinator(master) node can process this request */ + ereport(LOG, + (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start", + wdNode->nodeName, + check?"check":"aquire"))); + if (get_local_node_state() == WD_COORDINATOR) { InterlockingNode* lockingNode = NULL; @@ -2004,7 +2011,10 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i /* check if we already have no lockholder node */ if (lockingNode->lockHolderNode == NULL || lockingNode->lockHolderNode == wdNode) { - printf("\n\t\t\t %s:%d LOCK REQUESTED IS NULL OR FROM SAME NODE \n",__FUNCTION__,__LINE__); + ereport(LOG, + (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start", + wdNode->nodeName, + check?"check":"aquire"))); if (check == false) { lockingNode->lockHolderNode = wdNode; @@ -2014,7 +2024,11 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i } else /* some other node is holding the lock */ { - printf("\n\t\t\t %s:%d Some other node is already holding the lock \n",__FUNCTION__,__LINE__); + ereport(LOG, + (errmsg("%s lock for failover command start request is denied to node \"%s\"", + check?"check":"aquire", + wdNode->nodeName), + errdetail("node \"%s\" is holding the lock",lockingNode->lockHolderNode->nodeName))); if (lockingNode->locked) res = FAILOVER_RES_BLOCKED; else @@ -2024,7 +2038,10 @@ node_is_asking_for_failover_cmd_start(WatchdogNode* wdNode, WDPacketData* pkt, i } else { - printf("\n\t\t\t %s:%d I am not in position \n",__FUNCTION__,__LINE__); + ereport(LOG, + (errmsg("failed to process failover command start request by watchdog node \"%s\"", + check?"check":"aquire"), + errdetail("I am standby node and request can only be handled by master node"))); res = FAILOVER_RES_ERROR; } return res; @@ -2035,6 +2052,11 @@ node_is_asking_for_failover_cmd_end(WatchdogNode* wdNode, WDPacketData* pkt, int { WDFailoverCMDResults res = FAILOVER_RES_TRANSITION; /* only coordinator(master) node can process this request */ + ereport(LOG, + (errmsg("watchdog node \"%s\" is requesting to %s lock for failover command start", + wdNode->nodeName, + resign?"check":"release"))); + if (get_local_node_state() == WD_COORDINATOR) { InterlockingNode* lockingNode = NULL; @@ -2056,12 +2078,23 @@ node_is_asking_for_failover_cmd_end(WatchdogNode* wdNode, WDPacketData* pkt, int } else /* some other node is holding the lock */ { + ereport(LOG, + (errmsg("%s lock for failover command end request is denied to node \"%s\"", + resign?"check":"release", + wdNode->nodeName), + errdetail("node \"%s\" is holding the lock",lockingNode->lockHolderNode->nodeName))); + res = FAILOVER_RES_BLOCKED; } } } else { + ereport(LOG, + (errmsg("failed to process failover command end request by watchdog node \"%s\"", + resign?"check":"release"), + errdetail("I am standby node and request can only be handled by master node"))); + res = FAILOVER_RES_ERROR; } return res; @@ -2128,45 +2161,42 @@ static WDPacketData* read_packet_of_type(SocketConnection* conn, char ensure_typ ret = socket_read(conn->sock,&type, sizeof(char), 1 ); if (ret != sizeof(char)) { - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); return NULL; } ereport(DEBUG1, - (errmsg("PACKET TYPE %c while need packet type %c",type,ensure_type))); + (errmsg("received packet tyep %c while need packet type %c",type,ensure_type))); if (ensure_type != WD_NO_MESSAGE && ensure_type != type) { /* The packet type is not what we want.*/ ereport(DEBUG1, (errmsg("invalid packet type. expecting %c while received %c",ensure_type,type))); - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); return NULL; } ret = socket_read(conn->sock, &cmd_id, sizeof(int) ,1); if (ret != sizeof(int)) { - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); return NULL; } cmd_id = ntohl(cmd_id); - + ereport(DEBUG3, - (errmsg("PACKET COMMAND ID %d",cmd_id))); + (errmsg("received packet with command id %d from watchdog node ",cmd_id))); ret = socket_read(conn->sock, &len, sizeof(int), 1); if (ret != sizeof(int)) { - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); return NULL; } len = ntohl(len); - ereport(DEBUG2, - (errmsg("PACKET DATA LENGTH %d",len))); - pkt = get_empty_packet(); set_message_type(pkt, type); set_message_commandID(pkt, cmd_id); @@ -2176,7 +2206,7 @@ static WDPacketData* read_packet_of_type(SocketConnection* conn, char ensure_typ ret = socket_read(conn->sock, buf, len,1); if (ret != len) { - close_socket_connection(conn,__LINE__); + close_socket_connection(conn); free_packet(pkt); pfree(buf); return NULL; @@ -2220,26 +2250,25 @@ static void wd_system_will_go_down(int code, Datum arg) { int i; ereport(LOG, - (errmsg("Watchdog child is shutting down"))); + (errmsg("Watchdog is shutting down"))); send_cluster_command(NULL, WD_INFORM_I_AM_GOING_DOWN, 0); if (get_local_node_state() == WD_COORDINATOR) resign_from_coordinator(); /* close server socket */ - close_socket_connection(&g_cluster.localNode->server_socket,__LINE__); + close_socket_connection(&g_cluster.localNode->server_socket); /* close all node sockets */ for (i=0; i< g_cluster.remoteNodeCount; i++) { WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]); - close_socket_connection(&wdNode->client_socket,__LINE__); - close_socket_connection(&wdNode->server_socket,__LINE__); + close_socket_connection(&wdNode->client_socket); + close_socket_connection(&wdNode->server_socket); } } -static void close_socket_connection(SocketConnection* conn,int line) +static void close_socket_connection(SocketConnection* conn) { - printf("Closing socket %d From %d\n",conn->sock,line); if ((conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED) || conn->sock_state == WD_SOCK_WAITING_FOR_CONNECT) { @@ -2361,7 +2390,7 @@ static int update_successful_outgoing_cons(fd_set* wmask, int pending_fds_count) ereport(LOG, (errmsg("error in outbond connection to %s:%d",wdNode->hostname,wdNode->wd_port), errdetail("%s",strerror(valopt)))); - close_socket_connection(&wdNode->client_socket,__LINE__); + close_socket_connection(&wdNode->client_socket); wdNode->client_socket.sock_state = WD_SOCK_ERROR; } else @@ -2684,7 +2713,6 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt) POOL_CONFIG* master_config = get_pool_config_from_json(pkt->data, pkt->len); if (master_config) { - printf("\n%s\n",pkt->data); verify_pool_configurations(master_config); } @@ -2709,7 +2737,6 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt) wdNode->tv.tv_sec = tempNode->tv.tv_sec; wdNode->wd_priority = tempNode->wd_priority; - printf("NODE INFO MESSAGE RECEVD\n"); print_watchdog_node_info(wdNode); if (authkey) @@ -2730,12 +2757,10 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt) break; case WD_INTERLOCKING_REQUEST: - printf("\n\t\t\t %s:%d INTERLOCKING REQUEST RECEIVED \n",__FUNCTION__,__LINE__); node_has_requested_for_interlocking(wdNode, pkt); break; case WD_INTERUNLOCKING_REQUEST: - printf("\n\t\t\t %s:%d UNLOCKING_REQUEST REQUEST RECEIVED \n",__FUNCTION__,__LINE__); node_has_resigned_from_interlocking(wdNode, pkt); break; @@ -2783,18 +2808,13 @@ static int standard_packet_processor(WatchdogNode* wdNode, WDPacketData* pkt) static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt) { - printf("sending to sock:%d state:%d \n",conn->sock,conn->sock_state); - if (conn->sock > 0 && conn->sock_state == WD_SOCK_CONNECTED) { if (write_packet_to_socket(conn->sock, pkt) == true) - { return true; - } - else - printf("Write failed while sending to sock:%d state:%d \n",conn->sock,conn->sock_state); - - close_socket_connection(conn,__LINE__); + ereport(DEBUG1, + (errmsg("sending packet failed, closing connection"))); + close_socket_connection(conn); } return false; @@ -2802,13 +2822,12 @@ static bool send_message_to_connection(SocketConnection* conn, WDPacketData *pkt static bool send_message_to_node(WatchdogNode* wdNode, WDPacketData *pkt) { - printf("send message to client socket of %s node \n",wdNode->nodeName); - if (send_message_to_connection(&wdNode->client_socket,pkt) == true) return true; - printf("send message to server socket of %s node \n",wdNode->nodeName); if (send_message_to_connection(&wdNode->server_socket,pkt) == true) return true; + ereport(DEBUG1, + (errmsg("sending packet to node \"%s\" failed, closing connection", wdNode->nodeName))); return false; } @@ -2905,8 +2924,6 @@ static bool watchdog_internal_command_packet_processor(WatchdogNode* wdNode, WDP nodeResult->cmdState = COMMAND_STATE_REPLIED; g_cluster.currentCommand.commandReplyFromCount++; - printf("----*****----- [%d] reply_from_count = %d AND sendTo_count = %d\n",__LINE__,g_cluster.currentCommand.commandReplyFromCount,g_cluster.currentCommand.commandSendToCount); - if (g_cluster.currentCommand.commandReplyFromCount >= g_cluster.currentCommand.commandSendToCount) { g_cluster.currentCommand.commandFinished = true; @@ -3025,7 +3042,8 @@ static int issue_watchdog_internal_command(WatchdogNode* wdNode, WDPacketData *p clear_command_node_result(nodeResult); if (nodeResult->wdNode->state == WD_DEAD || nodeResult->wdNode->state == WD_SHUTDOWN) { - printf(" Not sending to DEAD node:%d name = [%s]\n",i,nodeResult->wdNode->nodeName); + ereport(DEBUG2, + (errmsg("not sending watchdog internal command packet to DEAD %s",nodeResult->wdNode->nodeName))); /* Do not send to dead nodes */ nodeResult->cmdState = COMMAND_STATE_DO_NOT_SEND; } @@ -3033,7 +3051,9 @@ static int issue_watchdog_internal_command(WatchdogNode* wdNode, WDPacketData *p { if (send_message_to_node(nodeResult->wdNode, pkt) == false) { - printf(" sending to node:%d name = [%s] FAILED \n",i,nodeResult->wdNode->nodeName); + ereport(DEBUG1, + (errmsg("failed to send watchdog internla command packet %s",nodeResult->wdNode->nodeName), + errdetail("saving the packet. will try to resend it if connection recovers"))); /* failed to send. May be try again later */ save_message = true; @@ -3266,14 +3286,12 @@ static bool wd_commands_packet_processor(WD_EVENTS event, WatchdogNode* wdNode, if (pkt->type == WD_INTERLOCKING_REQUEST) { - printf("\n\t\t\t %s:%d INTERLOCKING REQUEST RECEIVED \n",__FUNCTION__,__LINE__); node_has_requested_for_interlocking(wdNode, pkt); return true; } if (pkt->type == WD_INTERUNLOCKING_REQUEST) { - printf("\n\t\t\t %s:%d UNLOCKING_REQUEST REQUEST RECEIVED \n",__FUNCTION__,__LINE__); node_has_resigned_from_interlocking(wdNode, pkt); return true; } @@ -3365,7 +3383,7 @@ static int watchdog_state_machine(WD_EVENTS event, WatchdogNode* wdNode, WDPacke } else if (event == WD_EVENT_PACKET_RCV) { - print_packet_info(pkt,wdNode); + print_received_packet_info(pkt,wdNode); if (pkt->type == WD_INFO_MESSAGE) standard_packet_processor(wdNode, pkt); @@ -3800,13 +3818,14 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN int i; send_cluster_command(NULL, WD_DECLARE_COORDINATOR_MESSAGE, 5); set_timeout(10); - printf("\nI AM SELECTING MYSELF AS COORDINATOR NODE\n"); + ereport(LOG, + (errmsg("I am announcing my self as master/coordinator watchdog node"))); for (i=0; i< g_cluster.remoteNodeCount; i++) { WatchdogNode* wdNode = &(g_cluster.remoteNodes[i]); - printf("NODE INFORMATION for NODE %d\n",i); + ereport(DEBUG2, + (errmsg("printing all remote node information"))); print_watchdog_node_info(wdNode); - printf("___________\n"); } } break; @@ -3819,6 +3838,16 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT) { ereport(LOG, + (errmsg("declare coordinator command finished with status:[%s]", + g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED? + "ALL NODES REPLIED": + "COMMAND TIMEED OUT"), + errdetail("The command was sent to %d nodes and %d nodes replied to it", + g_cluster.currentCommand.commandSendToCount, + g_cluster.currentCommand.commandReplyFromCount + ))); + + ereport(LOG, (errmsg("I am the cluster leader node. Starting escalation process"), errdetail("our declare coordinator message is accepted by all nodes"))); @@ -3839,20 +3868,30 @@ static int watchdog_state_machine_coordinator(WD_EVENTS event, WatchdogNode* wdN } } - if (g_cluster.currentCommand.packet.type == WD_IAM_COORDINATOR_MESSAGE) + else if (g_cluster.currentCommand.packet.type == WD_IAM_COORDINATOR_MESSAGE) { - if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED || - g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT) + if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_ALL_REPLIED) + { + ereport(DEBUG1, + (errmsg("I am the cluster leader node command finished with status:[ALL NODES REPLIED]"), + errdetail("The command was sent to %d nodes and %d nodes replied to it", + g_cluster.currentCommand.commandSendToCount, + g_cluster.currentCommand.commandReplyFromCount + ))); + } + else if (g_cluster.currentCommand.commandStatus == COMMAND_FINISHED_TIMEOUT) { - printf("\n**************I AM COORDINATOR MESSAGE ACCEPTED**************\n"); - printf("Send to count = %d\n",g_cluster.currentCommand.commandSendToCount); - printf("Reply from count = %d\n",g_cluster.currentCommand.commandReplyFromCount); - printf("commandStatus = %d\n",g_cluster.currentCommand.commandStatus); + ereport(DEBUG1, + (errmsg("I am the cluster leader node command finished with status:[COMMAND TIMEED OUT] which is success"), + errdetail("The command was sent to %d nodes and %d nodes replied to it", + g_cluster.currentCommand.commandSendToCount, + g_cluster.currentCommand.commandReplyFromCount + ))); } else { /* command is finished but because of error */ - ereport(NOTICE, + ereport(WARNING, (errmsg("possible split brain scenario detected by \"%s\" node", wdNode->nodeName), (errdetail("re-initializing cluster")))); set_state(WD_JOINING); @@ -4329,17 +4368,16 @@ static int set_state(WD_STATES newState) { WD_STATES oldState = g_cluster.localNode->state; ereport(LOG, - (errmsg("setting watchdog state old state = %s NEW = %s",debug_states[oldState],debug_states[newState]))); + (errmsg("watchdog state changed from [%s] to [%s]",debug_states[oldState],debug_states[newState]))); g_cluster.localNode->state = newState; /* If we are resigning from being coordinator * kill the escalation child process - */ if (oldState == WD_COORDINATOR && newState != WD_COORDINATOR && g_cluster.escalation_pid > 0) { - printf(" I am resigning from coordinator so killing pid =%d\n",g_cluster.escalation_pid); kill(g_cluster.escalation_pid,SIGTERM); g_cluster.escalation_pid = -1; } + */ if (oldState != newState) watchdog_state_machine(WD_EVENT_WD_STATE_CHANGED, NULL, NULL); return 0; @@ -4420,23 +4458,35 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* int node_count = 0; int *node_id_list = NULL; + if (pkt->data == NULL || pkt->len == 0) + { + ereport(LOG, + (errmsg("watchdog is unable to process pgpool replicate command"), + errdetail("command packet contains no data"))); + return false; + } + ereport(DEBUG1, + (errmsg("processing pgpool replicate variable command"), + errdetail("%s",pkt->data))); + root = json_parse(pkt->data,pkt->len); /* The root node must be object */ if (root == NULL || root->type != json_object) { json_value_free(root); - ereport(NOTICE, - (errmsg("unable to parse json data from replicate command"))); + ereport(LOG, + (errmsg("watchdog is unable to process pgpool replicate command"), + errdetail("invalid json data \"%s\"",pkt->data))); return false; } func_name = json_get_string_value_for_key(root, "Function"); if (func_name == NULL) { json_value_free(root); - ereport(NOTICE, - (errmsg("invalid json data"), - errdetail("unable to find Watchdog Function Name"))); + ereport(LOG, + (errmsg("watchdog is unable to process pgpool replicate command"), + errdetail("function node not found in json data \"%s\"",pkt->data))); return false; } func_name = pstrdup(func_name); @@ -4450,22 +4500,22 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* value = json_get_value_for_key(root,"NodeIdList"); if (value == NULL) { - ereport(ERROR, + ereport(WARNING, (errmsg("invalid json data"), errdetail("unable to find NodeIdList node from data"))); } if (value->type != json_array) { is_error = true; - ereport(NOTICE, + ereport(WARNING, (errmsg("invalid json data"), errdetail("NodeIdList node does not contains Array"))); } if (node_count != value->u.array.length) { is_error = true; - ereport(NOTICE, - (errmsg("invalid json data"), + ereport(WARNING, + (errmsg("invalid json data"), errdetail("NodeIdList array contains %d nodes while expecting %d",value->u.array.length, node_count))); } @@ -4484,12 +4534,7 @@ static bool process_pgpool_replicate_command(WatchdogNode* wdNode, WDPacketData* node_count = 0; } } - int k; json_value_free(root); - printf("***** NEW WD COMMAND *****\n FUNCTION = \"%s\"\nNode Count = %d\n",func_name, node_count); - for (k =0; k< node_count; k++) - printf("NODE ID [%d] = %d\n",k,node_id_list[k]); - printf("\n"); return process_wd_command_function(wdNode, pkt, func_name, node_count, node_id_list); } @@ -4522,10 +4567,10 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, wd_func_command->commandID = pkt->command_id; wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name); wd_func_command->wdNode = wdNode; - + /* Add this command for timer tick */ add_wd_command_for_timer_events(pool_config->recovery_timeout, true, wd_func_command); - + MemoryContextSwitchTo(oldCxt); } @@ -4587,24 +4632,6 @@ static bool process_wd_command_function(WatchdogNode* wdNode, WDPacketData* pkt, promote_backend(node_id_list[0]); } } - - else if (strcasecmp("TEST_SYSTEM", func_name) == 0) - { - printf("&&&&&&_____[%d] PROCESSING TEST_SYSTEM COMMAND\n",__LINE__); - WDFunctionCommandData* wd_func_command; - MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext); - - wd_func_command = palloc(sizeof(WDFunctionCommandData)); - wd_func_command->commandType = pkt->type; - wd_func_command->commandID = pkt->command_id; - wd_func_command->funcName = MemoryContextStrdup(TopMemoryContext,func_name); - wd_func_command->wdNode = wdNode; - - /* Add this command for timer tick */ - add_wd_command_for_timer_events(10, true, wd_func_command); - - MemoryContextSwitchTo(oldCxt); - } else { /* This is not supported function */ @@ -4619,7 +4646,9 @@ static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, int i; WDCommandNodeResult* nodeResult = NULL; /* get the result node for */ - printf("----*****----- [%d] WE HAVE RECEIVED REPLY FOR REPLICATE COMMAND WE ISSUED\n",__LINE__); + ereport(DEBUG1, + (errmsg("watchdog node \"%s\" has replied for pgpool-II replicate command packet",wdNode->nodeName))); + for (i=0; i< g_cluster.remoteNodeCount; i++) { nodeResult = &ipcCommand->nodeResults[i]; @@ -4629,15 +4658,16 @@ static bool reply_is_received_for_pgpool_replicate_command(WatchdogNode* wdNode, } if (nodeResult == NULL) { - ereport(NOTICE,(errmsg("unable to find node result"))); + ereport(WARNING, + (errmsg("unable to find result node for pgpool-II replicate command packet received from watchdog node \"%s\"",wdNode->nodeName))); return true; } nodeResult->result_type = pkt->type; nodeResult->cmdState = COMMAND_STATE_REPLIED; ipcCommand->reply_from_count++; - - printf("----*****----- [%d] reply_from_count = %d AND sendTo_count = %d\n",__LINE__,ipcCommand->reply_from_count,ipcCommand->sendTo_count); - + ereport(DEBUG2, + (errmsg("watchdog node \"%s\" has replied for pgpool-II replicate command packet",wdNode->nodeName), + errdetail("command was sent to %d nodes and %d nodes have replied to it",ipcCommand->sendTo_count,ipcCommand->reply_from_count))); if (ipcCommand->reply_from_count >= ipcCommand->sendTo_count) { /* @@ -4693,23 +4723,6 @@ static bool process_wd_command_timer_event(bool timer_expired, WDFunctionCommand } return false; } - - if (wd_func_command->funcName && strcasecmp("TEST_SYSTEM", wd_func_command->funcName) == 0) - { - if (timer_expired) - { - printf("****%s:%d Timer Expired TEST_STSTEM function, Sending back accept message\n",__FUNCTION__,__LINE__); - - WDPacketData emptyPkt; - emptyPkt.command_id = wd_func_command->commandID; - reply_with_minimal_message(wd_func_command->wdNode, WD_ACCEPT_MESSAGE, &emptyPkt); - return true; - } - else - printf("****%s:%d Timer tick called on TEST_STSTEM function\n",__FUNCTION__,__LINE__); - return false; - } - } /* Just remove the timer.*/ return true; @@ -4721,11 +4734,11 @@ static void process_wd_func_commands_for_timer_events(void) ListCell *lc; List* timers_to_del = NIL; + if (g_cluster.wd_timer_commands == NULL) + return; + gettimeofday(&currTime, NULL); - - if (g_cluster.wd_timer_commands != NULL) - printf("****%s:%d \n",__FUNCTION__,__LINE__); - + foreach(lc, g_cluster.wd_timer_commands) { WDCommandTimerData* timerData = lfirst(lc); @@ -4935,15 +4948,17 @@ static bool verify_authhash_for_node(WatchdogNode* wdNode, char* authhash) /* DEBUG */ static void print_watchdog_node_info(WatchdogNode* wdNode) { - printf("********\t STATE = %s\n",debug_states[wdNode->state]); - printf("********\t HostName = %s\n",wdNode->hostname); - printf("********\t NodeName = %s\n",wdNode->nodeName); - printf("********\t WDPort = %d\n",wdNode->wd_port); - printf("********\t pgp port = %d\n",wdNode->pgpool_port); - printf("********\t Priority = %d\n",wdNode->wd_priority); + ereport(DEBUG2, + (errmsg("state: \"%s\" Host: \"%s\" Name: \"%s\" WD Port:%d PP Port: %d priority:%d", + debug_states[wdNode->state], + wdNode->hostname + ,wdNode->nodeName + ,wdNode->wd_port + ,wdNode->pgpool_port + ,wdNode->wd_priority))); } -static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode) +static void print_received_packet_info(WDPacketData* pkt,WatchdogNode* wdNode) { int i; packet_types *pkt_type = NULL; @@ -4958,6 +4973,9 @@ static void print_packet_info(WDPacketData* pkt,WatchdogNode* wdNode) break; } } - printf("\n******Packet [ID:%d]received of type [%s] from \"%s\" My state = (%s) \n",pkt->command_id, pkt_type?pkt_type->name:"UNKNOWN", - wdNode->nodeName, debug_states[get_local_node_state()]); + ereport(DEBUG2, + (errmsg("watchdog packet received from node \"%s\"",wdNode->nodeName), + errdetail("command id : %d Type: %s my watchdog state :%s",pkt->command_id, + pkt_type?pkt_type->name:"UNKNOWN", + debug_states[get_local_node_state()]))); } diff --git a/src/watchdog/wd_commands.c b/src/watchdog/wd_commands.c index c1d274bb3..cbe70141d 100644 --- a/src/watchdog/wd_commands.c +++ b/src/watchdog/wd_commands.c @@ -44,6 +44,7 @@ #include "utils/elog.h" #include "utils/json_writer.h" #include "utils/json.h" +#include "utils/pool_stream.h" #include "pool_config.h" #include "watchdog/wd_ipc_commands.h" #include "watchdog/wd_ipc_defines.h" @@ -64,7 +65,6 @@ static char* get_wd_node_function_json(char* func_name, int *node_id_set, int co static char* get_wd_simple_function_json(char* func); static char* get_wd_failover_cmd_type_json(WDFailoverCMDTypes cmdType, char* reqType); WDFailoverCMDResults wd_send_failover_sync_command(WDFailoverCMDTypes cmdType, char* syncReqType); -static int read_socket(int socket, void* buf, int len); static int wd_set_node_mask (unsigned char req_mask, int *node_id_set, int count); static int wd_chk_node_mask (unsigned char req_mask, int *node_id_set, int count); @@ -111,30 +111,44 @@ issue_command_to_watchdog(char type, WD_COMMAND_ACTIONS command_action,int timeo int sock; WDIPCCmdResult* result = NULL; char res_type = 'P'; - int res_length; + int res_length, action; gettimeofday(&start_time, NULL); /* open the watchdog command socket for IPC */ sock = open_wd_command_sock(false); if (sock < 0) + return NULL; + + res_length = htonl(data_len); + action = htonl(command_action); + + if (socket_write(sock, &type, sizeof(char)) <= 0) { + close(sock); return NULL; } - if (send(sock,&type,1,0) < 1) + + + if (socket_write(sock, &action, sizeof(int)) <= 0) + { + close(sock); + return NULL; + } + + if (socket_write(sock, &res_length, sizeof(int)) <= 0) { close(sock); return NULL; } - /* - * since the command action will be consumed locally, - * so no need to convert it to network byte order - */ - res_length = htonl(data_len); - send(sock,&command_action,sizeof(command_action),0); - send(sock,&res_length,sizeof(int),0); if (data && data_len > 0) - send(sock,data,data_len,0); - + { + if (socket_write(sock, data, data_len) <= 0) + { + close(sock); + return NULL; + } + } + if (blocking) { /* if we are asked to wait for results */ @@ -153,40 +167,34 @@ issue_command_to_watchdog(char type, WD_COMMAND_ACTIONS command_action,int timeo select_res = select(sock+1,&fds,NULL,NULL,timeout_st); if (select_res > 0) { - int ret; /* read the result type char */ - ret = read_socket(sock, &res_type, 1); - - if (ret != 1) + if (socket_read(sock, &res_type, 1 ,0) <=0) { - ereport(DEBUG1, - (errmsg("error reading from IPC command socket"), + ereport(LOG, + (errmsg("error reading from IPC command socket"), errdetail("read from socket failed with error \"%s\"",strerror(errno)))); close(sock); return result; } /* read the result data length */ - ret = read_socket(sock, &res_length, 4); - if (ret != 4) + if (socket_read(sock, &res_length, sizeof(int), 0) <= 0) { - ereport(DEBUG1, - (errmsg("error reading from IPC command socket"), + ereport(LOG, + (errmsg("error reading from IPC command socket"), errdetail("read from socket failed with error \"%s\"",strerror(errno)))); close(sock); return result; } + result = palloc(sizeof(WDIPCCmdResult)); result->type = res_type; result->length = ntohl(res_length); result->data = NULL; - + if (result->length > 0) { - int ret; - /* read the result data length */ result->data = palloc(result->length); - ret = read_socket(sock, result->data, result->length); - if (ret != result->length) + if (socket_read(sock, result->data, result->length, 0) <= 0) { pfree(result->data); pfree(result); @@ -248,7 +256,7 @@ wd_start_recovery(void) if (result == NULL) { ereport(LOG, - (errmsg("start recovery command lock failed"), + (errmsg("start recovery command lock failed"), errdetail("issue command to watchdog returned NULL"))); return COMMAND_FAILED; } @@ -258,7 +266,7 @@ wd_start_recovery(void) if (type == WD_IPC_CMD_CLUSTER_IN_TRAN) { ereport(LOG, - (errmsg("start recovery command lock failed"), + (errmsg("start recovery command lock failed"), errdetail("watchdog cluster is not in stable state"), errhint("try again when the cluster is fully initialized"))); return CLUSTER_IN_TRANSATIONING; @@ -600,24 +608,6 @@ open_wd_command_sock(bool throw_error) return sock; } - - -static int read_socket(int socket, void* buf, int len) -{ - int read_len = 0; - while (read_len < len) - { - int nret; - nret = read(socket, buf + read_len, len - read_len); - if (nret <= 0) - return nret; - read_len +=nret; - } - return read_len; -} - - - WDFailoverCMDResults wd_failover_command_start(WDFailoverCMDTypes cmdType) { if (pool_config->use_watchdog) diff --git a/src/watchdog/wd_lifecheck.c b/src/watchdog/wd_lifecheck.c index 0df03278e..88bb748a8 100644 --- a/src/watchdog/wd_lifecheck.c +++ b/src/watchdog/wd_lifecheck.c @@ -423,18 +423,17 @@ static void print_lifecheck_cluster(void) int i; if (!gslifeCheckCluster) return; - printf("Nodes count = %d\n",gslifeCheckCluster->nodeCount); + ereport(LOG, + (errmsg("%d watchdog nodes are configured for lifecheck",gslifeCheckCluster->nodeCount))); for (i = 0; i< gslifeCheckCluster->nodeCount; i++) { - printf("NODE NO %d\n",i); - printf("\t ID = %d\n",gslifeCheckCluster->lifeCheckNodes[i].ID); - printf("\t Name = %s\n",gslifeCheckCluster->lifeCheckNodes[i].nodeName); - printf("\t Host = %s\n",gslifeCheckCluster->lifeCheckNodes[i].hostName); - printf("\t WDPort = %d\n",gslifeCheckCluster->lifeCheckNodes[i].wdPort); - printf("\t pp Port = %d\n",gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort); - printf("--------------\n"); + ereport(LOG, + (errmsg("watchdog nodes ID:%d Name:\"%s\"",gslifeCheckCluster->lifeCheckNodes[i].ID,gslifeCheckCluster->lifeCheckNodes[i].nodeName), + errdetail("Host:\"%s\" WD Port:%d pgpool-II port:%d", + gslifeCheckCluster->lifeCheckNodes[i].hostName, + gslifeCheckCluster->lifeCheckNodes[i].wdPort, + gslifeCheckCluster->lifeCheckNodes[i].pgpoolPort))); } - printf("========\n"); } static bool inform_node_status(LifeCheckNode* node, char *message) |
