/* -*-pgsql-c-*- */ /* * $Header$ * * pgpool: a language independent connection pool server for PostgreSQL * written by Tatsuo Ishii * * Copyright (c) 2003-2007 PgPool Global Development Group * * Permission to use, copy, modify, and distribute this software and * its documentation for any purpose and without fee is hereby * granted, provided that the above copyright notice appear in all * copies and that both that copyright notice and this permission * notice appear in supporting documentation, and that the name of the * author not be used in advertising or publicity pertaining to * distribution of the software without specific, written prior * permission. The author makes no representations about the * suitability of this software for any purpose. It is provided "as * is" without express or implied warranty. * * pool_process_query.c: query processing stuff * */ #include "config.h" #include #ifdef HAVE_SYS_TYPES_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif #include #include #include #include #include #include "pool.h" #define INIT_STATEMENT_LIST_SIZE 8 #define DEADLOCK_ERROR_CODE "40P01" #define POOL_ERROR_QUERY "send invalid query from pgpool to abort transaction" typedef struct { char *statement_name; char *portal_name; char *prepared_string; } PreparedStatement; /* * prepared statement list */ typedef struct { int size; int cnt; PreparedStatement **stmt_list; } PreparedStatementList; static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS Query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query); static POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); #ifdef NOT_USED static POOL_STATUS Sync(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); #endif static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int send_ready); static POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int copyin); static POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int RowDescription(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short *result); static POOL_STATUS AsciiRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields); static POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields); static POOL_STATUS FunctionCall(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS FunctionResultResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static POOL_STATUS send_extended_protocol_message(POOL_CONNECTION *cp, char *kind, int len, char *string); static POOL_STATUS send_execute_message(POOL_CONNECTION *cp, int len, char *string); static int synchronize(POOL_CONNECTION *cp); static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt); static int is_select_query(char *sql); static int is_sequence_query(char *sql); static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql); static void start_load_balance(POOL_CONNECTION_POOL *backend); static void end_load_balance(POOL_CONNECTION_POOL *backend); static POOL_STATUS do_command(POOL_CONNECTION *backend, char *query, int protoMajor, int no_ready_for_query); static POOL_STATUS do_error_command(POOL_CONNECTION *backend, int protoMajor); static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query); static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query); static char *get_insert_command_table_name(char *query); static char *get_execute_command_portal_name(char *query); static PreparedStatement *get_prepared_command_portal_and_statement(char *query); static char *skip_comment(char *query); static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt); static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt); static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt); static void reset_prepared_list(PreparedStatementList *p); static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name); static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name); static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n); static char *normalize_prepared_stmt_name(const char *name); static POOL_STATUS error_kind_mismatch(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int kind, int kind1); static POOL_CONNECTION_POOL_SLOT *slots[MAX_CONNECTION_SLOTS]; static int in_load_balance; /* non 0 if in load balance mode */ static int master_slave_dml; /* non 0 if master/slave mode is specified in config file */ static int replication_was_enabled; /* replication mode was enabled */ static int master_slave_was_enabled; /* master/slave mode was enabled */ static int internal_transaction_started; /* to issue table lock command a transaction has been started internally */ static int select_in_transaction = 0; /* non 0 if select query is in transaction */ static int extended_select = 0; /* non 0 if extended mode */ static int in_progress = 0; static void (*pending_function)(PreparedStatementList *p, PreparedStatement *statement) = NULL; static PreparedStatement *pending_prepared_stmt = NULL; static PreparedStatementList prepared_list; /* prepared statement name list */ static PreparedStatement *unnamed_statement = NULL; static PreparedStatement *unnamed_portal = NULL; static int force_replication = 0; /* non 0 if force to replicate query */ static int prepare_in_session = 0; static int receive_sync = 0; static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */ static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */ static int detect_deadlock_error(POOL_CONNECTION *master, int major); POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int connection_reuse, int first_ready_for_query_received) { char kind, kind1; /* packet kind (backend) */ char fkind; /* packet kind (frontend) */ short num_fields = 0; fd_set readmask; fd_set writemask; fd_set exceptmask; int fds; POOL_STATUS status; int state; /* 0: ok to issue commands 1: waiting for "ready for query" response */ int qcnt; frontend->no_forward = connection_reuse; qcnt = 0; state = 0; for (;;) { kind = kind1 = 0; fkind = 0; if (state == 0 && connection_reuse) { int st; /* send query for resetting connection such as "ROLLBACK" "RESET ALL"... */ st = reset_backend(backend, qcnt); if (st < 0) /* error? */ { /* probably we don't need this, since caller will * close the connection to frontend after returning with POOL_END. But I * guess I would like to be a paranoid... */ frontend->no_forward = 0; return POOL_END; } else if (st == 0) /* no query issued? */ { qcnt++; continue; } else if (st == 1) /* more query remains */ { state = 1; qcnt++; continue; } else /* no more query(st == 2) */ { frontend->no_forward = 0; prepare_in_session = 0; return POOL_CONTINUE; } } if ((!DUAL_MODE && MASTER(backend)->len == 0 && (frontend->len == 0 || in_progress)) || (DUAL_MODE && MASTER(backend)->len == 0 && SECONDARY(backend)->len == 0 && (frontend->len == 0 || in_progress))) { struct timeval timeout; timeout.tv_sec = 1; timeout.tv_usec = 0; FD_ZERO(&readmask); FD_ZERO(&writemask); FD_ZERO(&exceptmask); if (!connection_reuse && !in_progress) FD_SET(frontend->fd, &readmask); FD_SET(MASTER(backend)->fd, &readmask); if (DUAL_MODE) FD_SET(SECONDARY(backend)->fd, &readmask); if (!connection_reuse && !in_progress) FD_SET(frontend->fd, &exceptmask); FD_SET(MASTER(backend)->fd, &exceptmask); if (connection_reuse || in_progress) { if (DUAL_MODE) fds = select(Max(SECONDARY(backend)->fd, MASTER(backend)->fd) + 1, &readmask, &writemask, &exceptmask, NULL); else fds = select(MASTER(backend)->fd+1, &readmask, &writemask, &exceptmask, NULL); } else { if (DUAL_MODE) fds = select(Max(SECONDARY(backend)->fd, Max(frontend->fd, MASTER(backend)->fd)) + 1, &readmask, &writemask, &exceptmask, NULL); else fds = select(Max(frontend->fd, MASTER(backend)->fd)+1, &readmask, &writemask, &exceptmask, NULL); } if (fds == -1) { if (errno == EINTR) continue; pool_error("select() failed. reason: %s", strerror(errno)); return POOL_ERROR; } if (fds == 0) { return POOL_CONTINUE; } if (FD_ISSET(MASTER(backend)->fd, &readmask)) { pool_read(MASTER(backend), &kind, 1); pool_debug("read kind from backend %c", kind); } if (DUAL_MODE && FD_ISSET(SECONDARY(backend)->fd, &readmask)) { pool_read(SECONDARY(backend), &kind1, 1); pool_debug("read kind from secondary backend %c", kind1); } if (FD_ISSET(MASTER(backend)->fd, &exceptmask)) { return POOL_ERROR; } if (!connection_reuse && !in_progress) { if (FD_ISSET(frontend->fd, &exceptmask)) { return POOL_END; } if (FD_ISSET(frontend->fd, &readmask)) { status = ProcessFrontendResponse(frontend, backend); if (status != POOL_CONTINUE) return status; if (kind != 0 || kind1 != 0) { pool_debug("kind(%02x) or kind1(%02x) != 0", kind, kind1); } else { continue; } } } } else { if (frontend->len > 0 && !in_progress) { status = ProcessFrontendResponse(frontend, backend); if (status != POOL_CONTINUE) return status; if (kind != 0 || kind1 != 0) { pool_debug("cached kind(%02x) or kind1(%02x) != 0", kind, kind1); } else { continue; } } if (pool_read(MASTER(backend), &kind, 1) < 0) return POOL_ERROR; if (REPLICATION) { if (pool_read(SECONDARY(backend), &kind1, 1)) return POOL_ERROR; if (kind == '\0' || kind != kind1) { return error_kind_mismatch(frontend, backend, kind, kind1); } } } /* this is the synchronous point */ if (DUAL_MODE || first_ready_for_query_received) { if (kind == 0) { if (pool_read(MASTER(backend), &kind, 1) < 0) return POOL_ERROR; } if (kind1 == 0) { if (SECONDARY(backend)->len <= 0) { /* at this point the query should have completed and it's safe to set timeout here */ pool_debug("pool_process_query: waiting for secondary for data ready"); /* temporary enable timeout */ pool_enable_timeout(); if (pool_check_fd(SECONDARY(backend), 0)) { pool_error("pool_process_query: secondary data is not ready at synchronous point. abort this session"); } else { pool_read(SECONDARY(backend), &kind1, 1); } pool_disable_timeout(); } else { pool_read(SECONDARY(backend), &kind1, 1); } } first_ready_for_query_received = 0; if (kind == '\0' || kind != kind1) { return error_kind_mismatch(frontend, backend, kind, kind1); } } /* * Prrocess backend Response */ if (kind == 0) { pool_error("kind is 0!"); return POOL_ERROR; } pool_debug("pool_process_query: kind from backend: %c", kind); if (MAJOR(backend) == PROTO_MAJOR_V3) { switch (kind) { case 'G': /* CopyIn response */ status = CopyInResponse(frontend, backend); break; case 'S': /* Paramter Status */ status = ParameterStatus(frontend, backend); break; case 'Z': /* Ready for query */ status = ReadyForQuery(frontend, backend, 1); break; default: status = SimpleForwardToFrontend(kind, frontend, backend); break; } } else { switch (kind) { case 'A': /* Notification response */ status = NotificationResponse(frontend, backend); break; case 'B': /* BinaryRow */ status = BinaryRow(frontend, backend, num_fields); break; case 'C': /* Complete command response */ status = CompleteCommandResponse(frontend, backend); break; case 'D': /* AsciiRow */ status = AsciiRow(frontend, backend, num_fields); break; case 'E': /* Error Response */ status = ErrorResponse(frontend, backend); break; case 'G': /* CopyIn Response */ status = CopyInResponse(frontend, backend); break; case 'H': /* CopyOut Response */ status = CopyOutResponse(frontend, backend); break; case 'I': /* Empty Query Response */ status = EmptyQueryResponse(frontend, backend); break; case 'N': /* Notice Response */ status = NoticeResponse(frontend, backend); break; case 'P': /* CursorResponse */ status = CursorResponse(frontend, backend); break; case 'T': /* RowDescription */ status = RowDescription(frontend, backend, &num_fields); break; case 'V': /* FunctionResultResponse and FunctionVoidResponse */ status = FunctionResultResponse(frontend, backend); break; case 'Z': /* Ready for query */ status = ReadyForQuery(frontend, backend, 1); break; default: pool_error("Unknown message type %c(%02x)", kind, kind); exit(1); } } if (status != POOL_CONTINUE) return status; if (kind == 'Z' && frontend->no_forward && state == 1) { state = 0; } } return POOL_CONTINUE; } static POOL_STATUS Query(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query) { char *string, *string1; int len; static char *sq = "show pool_status"; POOL_STATUS status; int deadlock_detected = 0; if (query == NULL) /* need to read query from frontend? */ { /* read actual query */ if (MAJOR(backend) == PROTO_MAJOR_V3) { if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); } else string = pool_read_string(frontend, &len, 0); if (string == NULL) return POOL_END; } else { len = strlen(query)+1; string = query; } /* show ps status */ query_ps_status(string, backend); /* log query to log file if neccessary */ if (pool_config.log_statement) { pool_log("statement: %s", string); } else { pool_debug("statement: %s", string); } /* * if this is DROP DATABASE command, send HUP signal to parent and * ask it to close all idle connections. * XXX This is overkill. It would be better to close the idle * connection for the database which DROP DATABASE command tries * to drop. This is impossible at this point, since we have no way * to pass such info to other processes. */ if (is_drop_database(string)) { int stime = 5; /* XXX give arbitary time to allow closing idle connections */ pool_debug("Query: sending HUP signal to parent"); kill(getppid(), SIGHUP); /* send HUP signal to parent */ /* we need to loop over here since we will get HUP signal while sleeping */ while (stime > 0) { stime = sleep(stime); } } /* process status reporting? */ if (strncasecmp(sq, string, strlen(sq)) == 0) { StartupPacket *sp; char psbuf[1024]; pool_debug("process reporting"); process_reporting(frontend, backend); in_progress = 0; /* show ps status */ sp = MASTER_CONNECTION(backend)->sp; snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", sp->user, sp->database, remote_ps_data); set_ps_display(psbuf, false); return POOL_CONTINUE; } if (strncasecmp("prepare", string, 7) == 0) { if (DUAL_MODE) force_replication = 1; if (frontend) { PreparedStatement *stmt; stmt = get_prepared_command_portal_and_statement(string); if (stmt != NULL) { pending_function = add_prepared_list; pending_prepared_stmt = stmt; if (DUAL_MODE) force_replication = 1; prepare_in_session = 1; } } } else if (strncasecmp("deallocate", string, 10) == 0) { if (DUAL_MODE) force_replication = 1; if (frontend) { char *query = string; char *buf, *name; query = skip_comment(query); /* skip "prepare" or "deallocate" */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; buf = strdup(query); name = strtok(buf, "\t\r\n (;"); pending_function = del_prepared_list; pending_prepared_stmt = malloc(sizeof(PreparedStatement)); if (pending_prepared_stmt == NULL) { pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); return POOL_END; } pending_prepared_stmt->statement_name = normalize_prepared_stmt_name(name); pending_prepared_stmt->portal_name = NULL; if (pending_prepared_stmt->statement_name == NULL) { pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); return POOL_END; } free(buf); } } if (frontend && (strncasecmp("execute", string, 7) == 0)) { char *portal_name = get_execute_command_portal_name(string); PreparedStatement *stmt; if (portal_name != NULL) { stmt = lookup_prepared_statement_by_statement(&prepared_list, portal_name); if (!stmt) string1 = string; else string1 = stmt->prepared_string; } else { string1 = string; } } else { string1 = string; } /* reset query have to be replicated */ if (frontend == NULL && prepare_in_session && REPLICATION) force_replication = 1; /* load balance trick */ if (load_balance_enabled(backend, string1)) start_load_balance(backend); else if (force_replication) { replication_was_enabled = REPLICATION; REPLICATION = 1; } else if (MASTER_SLAVE) { master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } else if (REPLICATION && !pool_config.replicate_select && is_select_query(string1) && !is_sequence_query(string1)) { int i; /* save backend connection slots */ for (i=0;inum;i++) { slots[i] = backend->slots[i]; } /* send query to master only. */ replication_was_enabled = 1; REPLICATION = 0; in_load_balance = 1; select_in_transaction = 1; extended_select = 0; } /* * judge if we need to lock the table * to keep SERIAL data consistency among servers * conditions: * - replication is enabled * - protocol is V3 * - statement is INSERT * - either "INSERT LOCK" comment exists or insert_lock directive specified */ if (REPLICATION && need_insert_lock(backend, string)) { /* start a transaction if needed and lock the table */ status = insert_lock(backend, string); if (status != POOL_CONTINUE) return status; } /* forward the query to the backend */ pool_write(MASTER(backend), "Q", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { int sendlen = htonl(len + 4); pool_write(MASTER(backend), &sendlen, sizeof(sendlen)); } if (pool_write_and_flush(MASTER(backend), string, len) < 0) { return POOL_END; } if (REPLICATION) #ifdef NOT_USED (frontend == NULL && ) /* we assume that frontend == NULL * means that Query() is called from * reset_backend(). In master/slave * mode AND load balance is enabled, * we need to send reset queries to * secondary as well. */ #endif { /* * in "strict mode" we need to wait for master completing the query. * note that this is not applied if "NO STRICT" is specified as a comment. */ if ((pool_config.replication_strict && !NO_STRICT_MODE(string)) || STRICT_MODE(string)) { pool_debug("waiting for master completing the query"); if (synchronize(MASTER(backend))) return POOL_END; /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; else if (deadlock_detected) { string = POOL_ERROR_QUERY; len = strlen(string) + 1; } } #define SEQUENCE_DEBUG #ifdef SEQUENCE_DEBUG if (!strncmp(string, "/*SLEEP*/", 9)) { pool_debug("start sleeping"); sleep(20); pool_debug("end sleeping"); } #endif pool_write(SECONDARY(backend), "Q", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { int sendlen = htonl(len + 4); pool_write(SECONDARY(backend), &sendlen, sizeof(sendlen)); } if (pool_write_and_flush(SECONDARY(backend), string, len) < 0) { return POOL_END; } /* in "strict mode" we need to wait for secondary completing the query */ if (pool_config.replication_strict || STRICT_MODE(string)) if (synchronize(SECONDARY(backend))) return POOL_END; } return POOL_CONTINUE; } /* * process EXECUTE (V3 only) */ static POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string; /* portal name + null terminate + max_tobe_returned_rows */ int len; int sendlen; int i; char kind; int status; PreparedStatement *stmt; int deadlock_detected = 0; int checked = 0; /* read Execute packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); pool_debug("Execute: portal name <%s>", string); stmt = lookup_prepared_statement_by_portal(&prepared_list, string); if (stmt) { /* load balance trick */ if (stmt && load_balance_enabled(backend, stmt->prepared_string)) start_load_balance(backend); else if (MASTER_SLAVE) { master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } else if (REPLICATION && !pool_config.replicate_select && is_select_query(stmt->prepared_string) && !is_sequence_query(stmt->prepared_string)) { int i; /* save backend connection slots */ for (i=0;inum;i++) { slots[i] = backend->slots[i]; } /* send query to master only. */ replication_was_enabled = 1; REPLICATION = 0; in_load_balance = 1; select_in_transaction = 1; extended_select = 1; } } for (i = 0;i < backend->num;i++) { POOL_CONNECTION *cp = backend->slots[i]->con; if (deadlock_detected) { pool_write(cp, "Q", 1); len = strlen(POOL_ERROR_QUERY) + 1; sendlen = htonl(len + 4); pool_write(cp, &sendlen, sizeof(sendlen)); pool_write_and_flush(cp, POOL_ERROR_QUERY, len); } else { /* forward the query to the backend */ if (send_execute_message(cp, len, string)) return POOL_ERROR; } if (!REPLICATION) break; else if (pool_config.replication_strict) { pool_debug("waiting for backend completing the query"); if (synchronize(cp)) return POOL_END; /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ if (checked == 0) { deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; checked = 1; } } } while ((kind = pool_read_kind(backend)), (kind != 'C' && kind != 'E' && kind != 'I' && kind != 's')) { if (kind == -2) /* kind mismatch */ { return error_kind_mismatch(frontend, backend, 0, 0); } else if (kind < 0) { pool_error("Execute: pool_read_kind error"); return POOL_ERROR; } status = SimpleForwardToFrontend(kind, frontend, backend); if (status != POOL_CONTINUE) return status; } status = SimpleForwardToFrontend(kind, frontend, backend); if (status != POOL_CONTINUE) return status; /* end load balance mode */ if (in_load_balance) end_load_balance(backend); if (master_slave_dml) { MASTER_SLAVE = 1; master_slave_was_enabled = 0; master_slave_dml = 0; } return POOL_CONTINUE; } /* * Extended query protocol has to send Flush message. */ static POOL_STATUS send_extended_protocol_message(POOL_CONNECTION *cp, char *kind, int len, char *string) { int sendlen; /* forward the query to the backend */ pool_write(cp, kind, 1); sendlen = htonl(len + 4); pool_write(cp, &sendlen, sizeof(sendlen)); pool_write(cp, string, len); /* * send "Flush" message so that backend notices us * the completion of the command */ pool_write(cp, "H", 1); sendlen = htonl(4); if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0) { return POOL_ERROR; } return POOL_CONTINUE; } static POOL_STATUS send_execute_message(POOL_CONNECTION *cp, int len, char *string) { return send_extended_protocol_message(cp, "E", len, string); } /* * process PARSE (V3 only) */ static POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char kind; int i; int len; char *string; char *name, *stmt; int deadlock_detected = 0; int checked = 0; POOL_STATUS status; /* read Parse packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); pool_debug("Parse: portal name <%s>", string); name = strdup(string); if (name == NULL) { pool_error("Parse: malloc failed: %s", strerror(errno)); return POOL_END; } pending_prepared_stmt = malloc(sizeof(PreparedStatement)); if (pending_prepared_stmt == NULL) { pool_error("Parse: malloc failed: %s", strerror(errno)); return POOL_END; } pending_prepared_stmt->portal_name = NULL; if (*string) { pending_function = add_prepared_list; pending_prepared_stmt->statement_name = name; } else { pending_function = add_unnamed_portal; pending_prepared_stmt->statement_name = NULL; free(name); } /* copy prepared statement string */ stmt = string; stmt += strlen(string) + 1; pending_prepared_stmt->prepared_string = strdup(stmt); if (pending_prepared_stmt->prepared_string == NULL) { pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); return POOL_END; } if (REPLICATION && need_insert_lock(backend, stmt)) { char kind; if (TSTATE(backend) != 'T') { /* synchronize transaction state */ for (i = 0; i < backend->num; i++) { POOL_CONNECTION *cp = backend->slots[i]->con; send_extended_protocol_message(cp, "S", 0, ""); } kind = pool_read_kind(backend); if (kind != 'Z') return POOL_END; if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE) return POOL_END; } /* start a transaction if needed and lock the table */ status = insert_lock(backend, stmt); if (status != POOL_CONTINUE) { return status; } } /* forward Parse message to backends */ for (i = 0; i < backend->num; i++) { POOL_CONNECTION *cp = backend->slots[i]->con; int sendlen; if (deadlock_detected) { pool_write(cp, "Q", 1); len = strlen(POOL_ERROR_QUERY) + 1; sendlen = htonl(len + 4); pool_write(cp, &sendlen, sizeof(sendlen)); pool_write_and_flush(cp, POOL_ERROR_QUERY, len); } else if (send_extended_protocol_message(cp, "P", len, string)) return POOL_END; if (!DUAL_MODE) break; else if (pool_config.replication_strict) { pool_debug("waiting for backend completing the query"); if (synchronize(cp)) return POOL_END; /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ if (checked == 0) { deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend)); if (deadlock_detected < 0) return POOL_END; checked = 1; } } } for (;;) { kind = pool_read_kind(backend); if (kind < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind, frontend, backend); if (pool_flush(frontend) < 0) return POOL_ERROR; /* * If warning or log messages are received, we must read * one message from backend. */ if (kind != 'N') /* Notice Message */ break; } return POOL_CONTINUE; } #ifdef NOT_USED /* * process Sync (V3 only) */ static POOL_STATUS Sync(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string; /* portal name + null terminate + max_tobe_returned_rows */ int len; int sendlen; /* read Sync packet */ if (pool_read(frontend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(frontend, len); /* forward the query to the backend */ pool_write(MASTER(backend), "S", 1); sendlen = htonl(len + 4); pool_write(MASTER(backend), &sendlen, sizeof(sendlen)); if (pool_write_and_flush(MASTER(backend), string, len) < 0) { return POOL_END; } if (REPLICATION) { /* * in "strict mode" we need to wait for master completing the query. * note that this is not applied if "NO STRICT" is specified as a comment. */ if (pool_config.replication_strict) { pool_debug("waiting for master completing the query"); if (synchronize(MASTER(backend))) return POOL_END; } pool_write(SECONDARY(backend), "S", 1); sendlen = htonl(len + 4); pool_write(SECONDARY(backend), &sendlen, sizeof(sendlen)); if (pool_write_and_flush(SECONDARY(backend), string, len) < 0) { return POOL_END; } /* in "strict mode" we need to wait for secondary completing the query */ if (pool_config.replication_strict) if (synchronize(SECONDARY(backend))) return POOL_END; } return POOL_CONTINUE; } #endif static POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int send_ready) { StartupPacket *sp; char psbuf[1024]; int len; signed char state; /* if a transaction is started for insert lock, we need to close it. */ if (internal_transaction_started && receive_sync == 0) { int i; int len; signed char state; if ((len = pool_read_message_length(backend)) < 0) return POOL_END; pool_debug("ReadyForQuery: message length: %d", len); len = htonl(len); state = pool_read_kind(backend); if (state < 0) return POOL_END; /* set transaction state */ pool_debug("ReadyForQuery: transaction state: %c", state); MASTER(backend)->tstate = state; if (DUAL_MODE) SECONDARY(backend)->tstate = state; for (i = 0;i < backend->num;i++) { if (do_command(backend->slots[i]->con, "COMMIT", PROTO_MAJOR_V3, 1) != POOL_CONTINUE) return POOL_ERROR; } internal_transaction_started = 0; } receive_sync = 0; if (MAJOR(backend) == PROTO_MAJOR_V3) { if ((len = pool_read_message_length(backend)) < 0) return POOL_END; pool_debug("ReadyForQuery: message length: %d", len); /* * Do not check transaction state in master/slave mode. * Because SET, PREPARE, DEALLOCATE are replicated. * If these queries are executed inside a transaction block, * transation state is inconsistency. But it is no problem. */ if (MASTER_SLAVE) { char kind1; pool_read(MASTER(backend), &state, 1); MASTER(backend)->tstate = state; pool_read(SECONDARY(backend), &kind1, 1); SECONDARY(backend)->tstate = state; } else { state = pool_read_kind(backend); if (state < 0) return POOL_END; /* set transaction state */ pool_debug("ReadyForQuery: transaction state: %c", state); MASTER(backend)->tstate = state; if (REPLICATION) SECONDARY(backend)->tstate = state; } } if (send_ready) { if (pool_flush(frontend)) return POOL_ERROR; pool_write(frontend, "Z", 1); if (MAJOR(backend) == PROTO_MAJOR_V3) { len = htonl(len); pool_write(frontend, &len, sizeof(len)); pool_write(frontend, &state, 1); } if (pool_flush(frontend)) return POOL_ERROR; } in_progress = 0; /* end load balance mode */ if (in_load_balance) end_load_balance(backend); if (master_slave_dml) { MASTER_SLAVE = 1; master_slave_was_enabled = 0; master_slave_dml = 0; } if (force_replication) { force_replication = 0; REPLICATION = replication_was_enabled; replication_was_enabled = 0; } #ifdef NOT_USED return ProcessFrontendResponse(frontend, backend); #endif sp = MASTER_CONNECTION(backend)->sp; if (MASTER(backend)->tstate == 'T') snprintf(psbuf, sizeof(psbuf), "%s %s %s idle in transaction", sp->user, sp->database, remote_ps_data); else snprintf(psbuf, sizeof(psbuf), "%s %s %s idle", sp->user, sp->database, remote_ps_data); set_ps_display(psbuf, false); return POOL_CONTINUE; } static POOL_STATUS CompleteCommandResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string, *string1; int len, len1; /* read command tag */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; if (REPLICATION) { string1 = pool_read_string(SECONDARY(backend), &len1, 0); if (string1 == NULL) return POOL_END; if (len != len1) { pool_debug("Complete Command Response: message length does not match between master(%d \"%s\",) and secondary(%d \"%s\",)", len, string, len1, string1); } } /* forward to the frontend */ pool_write(frontend, "C", 1); pool_debug("Complete Command Response: string: \"%s\"", string); if (pool_write(frontend, string, len) < 0) { return POOL_END; } return POOL_CONTINUE; } static int RowDescription(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short *result) { short num_fields, num_fields1; int oid, mod; int oid1, mod1; short size, size1; char *string, *string1; int len, len1; int i; /* # of fields (could be 0) */ pool_read(MASTER(backend), &num_fields, sizeof(short)); if (REPLICATION) { pool_read(SECONDARY(backend), &num_fields1, sizeof(short)); if (num_fields != num_fields1) { pool_error("RowDescription: num_fields deos not match between backends master(%d) and secondary(%d)", num_fields, num_fields1); return POOL_FATAL; } } /* forward it to the frontend */ pool_write(frontend, "T", 1); pool_write(frontend, &num_fields, sizeof(short)); num_fields = ntohs(num_fields); for (i = 0;i 0 */ if (size > 0) { buf = pool_read2(MASTER(backend), size); if (buf == NULL) return POOL_END; } } if (REPLICATION && size1 > 0 && (mask & nullmap1[i/8])) { /* read and discard secondary data */ if (pool_read2(SECONDARY(backend), size1) == NULL) return POOL_END; } if (buf) { pool_write(frontend, buf, size); snprintf(msgbuf, Min(sizeof(msgbuf), size+1), "%s", buf); pool_debug("AsciiRow: len: %d data: %s", size, msgbuf); } mask >>= 1; } if (pool_flush(frontend)) return POOL_END; return POOL_CONTINUE; } static POOL_STATUS BinaryRow(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, short num_fields) { static char nullmap[8192], nullmap1[8192]; int nbytes; int i; unsigned char mask; int size, size1; char *buf; pool_write(frontend, "B", 1); nbytes = (num_fields + 7)/8; if (nbytes <= 0) return POOL_CONTINUE; /* NULL map */ pool_read(MASTER(backend), nullmap, nbytes); if (pool_write(frontend, nullmap, nbytes) < 0) return POOL_END; if (REPLICATION) { if (pool_read(SECONDARY(backend), nullmap1, nbytes) < 0) return POOL_END; if (memcmp(nullmap, nullmap1, nbytes)) { /* XXX: NULLMAP maybe different among backends. If we were a paranoid, we have to treat this as a fatal error. However in the real world we'd better to adapt this situation. Just throw a log... */ pool_debug("BinaryRow: NULLMAP differ between master and secondary"); } } mask = 0; for (i = 0;i 0 */ if (size > 0) { buf = pool_read2(MASTER(backend), size); if (buf == NULL) return POOL_END; } } if (REPLICATION && size1 > 0 && (mask & nullmap1[i/8])) { /* read and discard secondary data */ if (pool_read2(SECONDARY(backend), size1) == NULL) return POOL_END; } if (buf) pool_write(frontend, buf, size); mask >>= 1; } if (pool_flush(frontend)) return POOL_END; return POOL_CONTINUE; } static POOL_STATUS CursorResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string, *string1; int len, len1; /* read cursor name */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; if (REPLICATION) { string1 = pool_read_string(SECONDARY(backend), &len1, 0); if (string1 == NULL) return POOL_END; if (len != len1) { pool_error("CursorResponse: length does not match between master(%d) and secondary(%d)", len, len1); pool_error("CursorResponse: master(%s) secondary(%s)", string, string1); return POOL_END; } } /* forward to the frontend */ pool_write(frontend, "P", 1); if (pool_write_and_flush(frontend, string, len) < 0) { return POOL_END; } return POOL_CONTINUE; } POOL_STATUS ErrorResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string; int len; /* read error message */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; if (REPLICATION) { string = pool_read_string(SECONDARY(backend), &len, 0); if (string == NULL) return POOL_END; } /* forward to the frontend */ pool_write(frontend, "E", 1); if (pool_write_and_flush(frontend, string, len) < 0) return POOL_END; return POOL_CONTINUE; } POOL_STATUS ErrorResponse2(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *buf; int len; /* forward to the frontend */ pool_write(frontend, "E", 1); /* read error message length */ if ((buf = pool_read2(MASTER(backend), sizeof(len))) == NULL) return POOL_END; /* forward to the frontend */ if (pool_write_and_flush(frontend, buf, sizeof(len)) < 0) return POOL_END; len = ntohl(*(int *)buf) - sizeof(len); if(len < 8 || len > 30000) { /* Handle error from a pre-3.0 server */ /* read error message */ if ((buf = pool_read_string(MASTER(backend), &len, 0)) == NULL) return POOL_END; /* forward to the frontend */ if (pool_write_and_flush(frontend, buf, len) < 0) return POOL_END; } else { /* read rest of error message */ if ((buf = pool_read2(MASTER(backend), len)) == NULL) return POOL_END; /* forward to the frontend */ if (pool_write_and_flush(frontend, buf, len) < 0) return POOL_END; } return POOL_CONTINUE; } POOL_STATUS NoticeResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char *string, *string1; int len, len1; /* read notice message */ string = pool_read_string(MASTER(backend), &len, 0); if (string == NULL) return POOL_END; if (REPLICATION) { string1 = pool_read_string(SECONDARY(backend), &len1, 0); if (string1 == NULL) return POOL_END; } /* forward to the frontend */ pool_write(frontend, "N", 1); if (pool_write_and_flush(frontend, string, len) < 0) { return POOL_END; } return POOL_CONTINUE; } static POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { POOL_STATUS status; /* forward to the frontend */ if (MAJOR(backend) == PROTO_MAJOR_V3) { if (SimpleForwardToFrontend('G', frontend, backend) != POOL_CONTINUE) return POOL_END; if (pool_flush(frontend) != POOL_CONTINUE) return POOL_END; } else if (pool_write_and_flush(frontend, "G", 1) < 0) return POOL_END; status = CopyDataRows(frontend, backend, 1); return status; } static POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { POOL_STATUS status; /* forward to the frontend */ if (MAJOR(backend) == PROTO_MAJOR_V3) { if (SimpleForwardToFrontend('H', frontend, backend) != POOL_CONTINUE) return POOL_END; if (pool_flush(frontend) != POOL_CONTINUE) return POOL_END; } else if (pool_write_and_flush(frontend, "H", 1) < 0) return POOL_END; status = CopyDataRows(frontend, backend, 0); return status; } static POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int copyin) { char *string; int len; #ifdef DEBUG int i = 0; char buf[1024]; #endif for (;;) { if (copyin) { if (MAJOR(backend) == PROTO_MAJOR_V3) { char kind; if (pool_read(frontend, &kind, 1) < 0) return POOL_END; SimpleForwardToBackend(kind, frontend, backend); /* CopyData? */ if (kind == 'd') continue; else break; } else string = pool_read_string(frontend, &len, 1); } else { /* CopyOut */ if (MAJOR(backend) == PROTO_MAJOR_V3) { signed char kind; if ((kind = pool_read_kind(backend)) < 0) return POOL_END; SimpleForwardToFrontend(kind, frontend, backend); /* CopyData? */ if (kind == 'd') continue; else break; } else { string = pool_read_string(MASTER(backend), &len, 1); if (REPLICATION) string = pool_read_string(SECONDARY(backend), &len, 1); } } if (string == NULL) return POOL_END; #ifdef DEBUG strncpy(buf, string, len); pool_debug("copy line %d %d bytes :%s:", i++, len, buf); #endif if (copyin) { pool_write(MASTER(backend), string, len); if (REPLICATION) pool_write(SECONDARY(backend), string, len); } else pool_write(frontend, string, len); if (len == PROTO_MAJOR_V3) { /* end of copy? */ if (string[0] == '\\' && string[1] == '.' && string[2] == '\n') { break; } } } if (copyin) { if (pool_flush(MASTER(backend)) <0) return POOL_END; if (REPLICATION) { if (pool_config.replication_strict) { pool_debug("waiting for master completing the copy"); if (synchronize(MASTER(backend))) return POOL_END; } if (pool_flush(SECONDARY(backend)) <0) return POOL_END; if (pool_config.replication_strict) { pool_debug("waiting for secondary completing the copy"); if (synchronize(SECONDARY(backend))) return POOL_END; } } } else if (pool_flush(frontend) <0) return POOL_END; return POOL_CONTINUE; } static POOL_STATUS EmptyQueryResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char c; if (pool_read(MASTER(backend), &c, sizeof(c)) < 0) return POOL_END; if (REPLICATION) { if (pool_read(SECONDARY(backend), &c, sizeof(c)) < 0) return POOL_END; } pool_write(frontend, "I", 1); return pool_write_and_flush(frontend, "", 1); } static POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int pid, pid1; char *condition, *condition1; int len, len1; pool_write(frontend, "A", 1); if (pool_read(MASTER(backend), &pid, sizeof(pid)) < 0) return POOL_ERROR; if (REPLICATION) { if (pool_read(SECONDARY(backend), &pid1, sizeof(pid1)) < 0) return POOL_ERROR; } condition = pool_read_string(MASTER(backend), &len, 0); if (condition == NULL) return POOL_END; if (REPLICATION) { condition1 = pool_read_string(SECONDARY(backend), &len1, 0); if (condition1 == NULL) return POOL_END; } pool_write(frontend, &pid, sizeof(pid)); return pool_write_and_flush(frontend, condition, len); } static POOL_STATUS FunctionCall(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { char dummy[2]; int oid; int argn; int i; pool_write(MASTER(backend), "F", 1); if (REPLICATION) pool_write(SECONDARY(backend), "F", 1); /* dummy */ if (pool_read(frontend, dummy, sizeof(dummy)) < 0) return POOL_ERROR; pool_write(MASTER(backend), dummy, sizeof(dummy)); if (REPLICATION) pool_write(SECONDARY(backend), dummy, sizeof(dummy)); /* function object id */ if (pool_read(frontend, &oid, sizeof(oid)) < 0) return POOL_ERROR; pool_write(MASTER(backend), &oid, sizeof(oid)); if (REPLICATION) pool_write(SECONDARY(backend), &oid, sizeof(oid)); /* number of arguments */ if (pool_read(frontend, &argn, sizeof(argn)) < 0) return POOL_ERROR; pool_write(MASTER(backend), &argn, sizeof(argn)); if (REPLICATION) pool_write(SECONDARY(backend), &argn, sizeof(argn)); argn = ntohl(argn); for (i=0;ilen <= 0 && frontend->no_forward != 0) return POOL_CONTINUE; if (pool_read(frontend, &fkind, 1) < 0) { pool_log("ProcessFrontendResponse: failed to read kind from frontend. frontend abnormally exited"); return POOL_ERROR; } pool_debug("read kind from frontend %c(%02x)", fkind, fkind); switch (fkind) { case 'X': if (MAJOR(backend) == PROTO_MAJOR_V3) { int len; pool_read(frontend, &len, sizeof(len)); } return POOL_END; case 'Q': in_progress = 1; status = Query(frontend, backend, NULL); break; /* case 'S': status = Sync(frontend, backend); break; */ case 'E': status = Execute(frontend, backend); break; case 'P': status = Parse(frontend, backend); break; default: if (MAJOR(backend) == PROTO_MAJOR_V3) { if (MASTER_SLAVE && TSTATE(backend) != 'I') { master_slave_was_enabled = 1; MASTER_SLAVE = 0; master_slave_dml = 1; } status = SimpleForwardToBackend(fkind, frontend, backend); if (pool_flush(MASTER(backend))) status = POOL_ERROR; if (DUAL_MODE) if (pool_flush(SECONDARY(backend))) status = POOL_ERROR; } else if (MAJOR(backend) == PROTO_MAJOR_V2 && fkind == 'F') status = FunctionCall(frontend, backend); else { pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind); status = POOL_ERROR; } break; } if (status != POOL_CONTINUE) status = POOL_ERROR; return status; } static int timeoutmsec; /* * enable read timeout */ void pool_enable_timeout() { timeoutmsec = pool_config.replication_timeout; } /* * disable read timeout */ void pool_disable_timeout() { timeoutmsec = 0; } /* * wait until read data is ready */ static int synchronize(POOL_CONNECTION *cp) { return pool_check_fd(cp, 1); } /* * wait until read data is ready * if notimeout is non 0, wait forever. */ int pool_check_fd(POOL_CONNECTION *cp, int notimeout) { fd_set readmask; fd_set exceptmask; int fd; int fds; struct timeval timeout; struct timeval *tp; fd = cp->fd; for (;;) { FD_ZERO(&readmask); FD_ZERO(&exceptmask); FD_SET(fd, &readmask); FD_SET(fd, &exceptmask); if (notimeout || timeoutmsec == 0) tp = NULL; else { timeout.tv_sec = pool_config.replication_timeout / 1000; timeout.tv_usec = (pool_config.replication_timeout - (timeout.tv_sec * 1000))*1000; tp = &timeout; } fds = select(fd+1, &readmask, NULL, &exceptmask, tp); if (fds == -1) { if (errno == EAGAIN || errno == EINTR) continue; pool_error("pool_check_fd: select() failed. reason %s", strerror(errno)); break; } if (FD_ISSET(fd, &exceptmask)) { pool_error("pool_check_fd: exception occurred"); break; } if (fds == 0) { pool_error("pool_check_fd: data is not ready tp->tv_sec %d tp->tp_usec %d", pool_config.replication_timeout / 1000, (pool_config.replication_timeout - (timeout.tv_sec * 1000))*1000); break; } return 0; } return -1; } static void process_reporting(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { static char *cursorname = "blank"; static short num_fields = 3; static char *field_names[] = {"item", "value", "description"}; static int oid = 0; static short fsize = -1; static int mod = 0; short n; int i, j; short s; int len; short colnum; static char nullmap[2] = {0xff, 0xff}; int nbytes = (num_fields + 7)/8; #define MAXVALLEN 512 typedef struct { char *name; char value[MAXVALLEN+1]; char *desc; } POOL_REPORT_STATUS; #define MAXITEMS 128 POOL_REPORT_STATUS status[MAXITEMS]; short nrows; int size; int hsize; int slen; i = 0; status[i].name = "listen_addresses"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.listen_addresses); status[i].desc = "host name(s) or IP address(es) to listen to"; i++; status[i].name = "port"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.port); status[i].desc = "pgpool accepting port number"; i++; status[i].name = "socket_dir"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.socket_dir); status[i].desc = "pgpool socket directory"; i++; status[i].name = "backend_host_name"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.backend_host_name); status[i].desc = "master backend host name"; i++; status[i].name = "backend_port"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.backend_port); status[i].desc = "master backend port number"; i++; status[i].name = "secondary_backend_host_name"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.secondary_backend_host_name); status[i].desc = "secondary backend host name"; i++; status[i].name = "secondary_backend_port"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.secondary_backend_port); status[i].desc = "secondary backend port number"; i++; status[i].name = "num_init_children"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.num_init_children); status[i].desc = "# of children initially pre-forked"; i++; status[i].name = "child_life_time"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.child_life_time); status[i].desc = "if idle for this seconds, child exits"; i++; status[i].name = "connection_life_time"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.connection_life_time); status[i].desc = "if idle for this seconds, connection closes"; i++; status[i].name = "child_max_connections"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.child_max_connections); status[i].desc = "if max_connections received, chile exits"; i++; status[i].name = "max_pool"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.max_pool); status[i].desc = "max # of connection pool per child"; i++; status[i].name = "authentication_timeout"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.authentication_timeout); status[i].desc = "maximum time in seconds to complete client authentication"; i++; status[i].name = "logdir"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.logdir); status[i].desc = "logging directory"; i++; status[i].name = "backend_socket_dir"; snprintf(status[i].value, MAXVALLEN, "%s", pool_config.backend_socket_dir); status[i].desc = "Unix domain socket directory for the PostgreSQL server"; i++; status[i].name = "replication_mode"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.replication_mode); status[i].desc = "non 0 if operating in replication mode"; i++; status[i].name = "replication_strict"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.replication_strict); status[i].desc = "non 0 if operating in strict mode"; i++; status[i].name = "replication_timeout"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.replication_timeout); status[i].desc = "if secondary does not respond in this milli seconds, abort the session"; i++; status[i].name = "load_balance_mode"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.load_balance_mode); status[i].desc = "non 0 if operating in load balancing mode"; i++; status[i].name = "weight_master"; snprintf(status[i].value, MAXVALLEN, "%f", pool_config.weight_master); status[i].desc = "weight of master"; i++; status[i].name = "weight_secondary"; snprintf(status[i].value, MAXVALLEN, "%f", pool_config.weight_secondary); status[i].desc = "weight of secondary"; i++; status[i].name = "replication_stop_on_mismatch"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.replication_stop_on_mismatch); status[i].desc = "stop replication mode on fatal error"; i++; status[i].name = "replicate_select"; snprintf(status[i].value, MAXVALLEN, "%d", pool_config.replicate_select); status[i].desc = "non 0 if SELECT statement is replicated"; i++; status[i].name = "reset_query_list"; *(status[i].value) = '\0'; for (j=0;jnum;i++) { POOL_CONNECTION *cp = backend->slots[i]->con; /* We need to send "sync" message to backend in extend mode * so that it accepts next command. * Note that this may be overkill since client may send * it by itself. Moreover we do not need it in non-extend mode. * At this point we regard it is not harmfull since error resonse * will not be sent too frequently. * We observed a process stall at the synchronous * point (Line 370 of this code) in this situation: * 1. replicate_select = false * 2. do extended select for MASTER, and get some error * 3. do_error_command for SECONDARY, * 4. send Sync to both MASTER and SECONDARY * 5. SECONDARY returns something but MASTER doesn't. * 6. then pgpool stalls at pool_read on #370 to wait * for a response from MASTER. * * To avoid this situation, we introduced a variable * 'extended_select' so that pgpool sends Sync only to * MASTER when the query is in extended mode. */ pool_write(cp, "S", 1); res1 = htonl(4); if (pool_write_and_flush(cp, &res1, sizeof(res1)) < 0) { return POOL_END; } if (!DUAL_MODE || sync_secondary == 0) break; } while ((k = pool_read_kind(backend)) != 'Z') { POOL_STATUS ret; if (k < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } ret = SimpleForwardToFrontend(k, frontend, backend); if (ret != POOL_CONTINUE) return ret; if (pool_flush(frontend) != 0) return POOL_END; } status = pool_read(MASTER(backend), &res1, sizeof(res1)); if (status < 0) { pool_error("SimpleForwardToFrontend: error while reading message length"); return POOL_END; } res1 = ntohl(res1) - sizeof(res1); p1 = pool_read2(MASTER(backend), res1); if (p1 == NULL) return POOL_END; if (DUAL_MODE) { status = pool_read(SECONDARY(backend), &res2, sizeof(res2)); if (status < 0) { pool_error("SimpleForwardToFrontend: error while reading message length from secondary backend"); return POOL_END; } res2 = ntohl(res2) - sizeof(res2); p1 = pool_read2(SECONDARY(backend), res2); if (p1 == NULL) return POOL_END; if (res1 != res2) { pool_debug("SimpleForwardToFrontend: length does not match between backends master(%d) secondary(%d) kind:(%c)", ntohl(res1), ntohl(res2), k); } } } return POOL_CONTINUE; } POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len; int sendlen; char *p; char *name = NULL; if (kind == 'S') /* Sync message */ receive_sync = 1; if (pool_write(MASTER(backend), &kind, 1)) return POOL_END; if (DUAL_MODE) if (pool_write(SECONDARY(backend), &kind, 1)) return POOL_END; if (pool_read(frontend, &sendlen, sizeof(sendlen))) { return POOL_END; } if (pool_write(MASTER(backend), &sendlen, sizeof(sendlen))) return POOL_END; if (DUAL_MODE) if (pool_write(SECONDARY(backend), &sendlen, sizeof(sendlen))) return POOL_END; len = ntohl(sendlen) - 4; if (len == 0) return POOL_CONTINUE; else if (len < 0) { pool_error("SimpleFowardToBackend: invalid message length"); return POOL_END; } p = pool_read2(frontend, len); if (p == NULL) return POOL_END; if (pool_write(MASTER(backend), p, len)) return POOL_END; if (DUAL_MODE) if (pool_write(SECONDARY(backend), p, len)) return POOL_END; if (kind == 'B') /* Bind message? */ { char *stmt_name, *portal_name; PreparedStatement *stmt; portal_name = p; stmt_name = p + strlen(portal_name) + 1; pool_debug("bind message: portal_name %s stmt_name %s", portal_name, stmt_name); if (*stmt_name == '\0') stmt = unnamed_statement; else { name = strdup(stmt_name); if (name == NULL) { pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); return POOL_END; } stmt = lookup_prepared_statement_by_statement(&prepared_list, name); free(name); } if (*portal_name == '\0') unnamed_portal = stmt; else { if (stmt->portal_name) free(stmt->portal_name); stmt->portal_name = strdup(portal_name); } } else if (kind == 'C' && *p == 'S' && *(p + 1)) { name = strdup(p+1); if (name == NULL) { pool_error("SimpleForwardToBackend: strdup failed: %s", strerror(errno)); return POOL_END; } pending_function = del_prepared_list; pending_prepared_stmt = malloc(sizeof(PreparedStatement)); if (pending_prepared_stmt == NULL) { pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); return POOL_END; } pending_prepared_stmt->statement_name = normalize_prepared_stmt_name(name); if (pending_prepared_stmt->statement_name == NULL) { pool_error("SimpleForwardToBackend: malloc failed: %s", strerror(errno)); return POOL_END; } pending_prepared_stmt->prepared_string = NULL; } if (kind == 'B' || kind == 'D' || kind == 'C') { int i; int kind1; for (i = 0;i < backend->num;i++) { POOL_CONNECTION *cp = backend->slots[i]->con; /* * send "Flush" message so that backend notices us * the completion of the command */ pool_write(cp, "H", 1); sendlen = htonl(4); if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0) { return POOL_END; } if (!DUAL_MODE) break; } /* * Describe message with a portal name receive two messages. * 1. ParameterDescription * 2. RowDescriptions or NoData */ if (kind == 'D' && *p == 'S') { kind1 = pool_read_kind(backend); if (kind1 < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind1, frontend, backend); if (pool_flush(frontend)) return POOL_END; } for (;;) { kind1 = pool_read_kind(backend); if (kind1 < 0) { pool_error("SimpleForwardToBackend: pool_read_kind error"); return POOL_ERROR; } SimpleForwardToFrontend(kind1, frontend, backend); if (pool_flush(frontend) < 0) return POOL_ERROR; /* * If warning or log messages are received, we must read * one message from backend. */ if (kind1 != 'N') /* Notice Message */ break; } } else { if (pool_flush(MASTER(backend))) return POOL_END; if (DUAL_MODE) if (pool_flush(SECONDARY(backend))) return POOL_END; } return POOL_CONTINUE; } POOL_STATUS ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len; int *len_array; int sendlen; char *p; char *name; char *value; pool_write(frontend, "S", 1); len_array = pool_read_message_length2(backend); if (len_array == NULL) { return POOL_END; } len = len_array[0]; sendlen = htonl(len); pool_write(frontend, &sendlen, sizeof(sendlen)); len -= 4; p = pool_read2(MASTER(backend), len); if (p == NULL) return POOL_END; name = p; value = p + strlen(name) + 1; pool_debug("name: %s value: %s", name, value); pool_add_param(&MASTER(backend)->params, name, value); #ifdef DEBUG pool_param_debug_print(&MASTER(backend)->params); #endif if (DUAL_MODE) { char *sp; if ((sp = pool_read2(SECONDARY(backend), len_array[1]-4)) == NULL) return POOL_END; name = sp; value = sp + strlen(name) + 1; pool_debug("secondary name: %s value: %s", name, value); } return pool_write(frontend, p, len); } /* * reset backend status. return values are: * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error */ static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt) { char *query; int qn; qn = pool_config.num_reset_queries; if (qcnt >= qn) { if (qcnt >= qn + prepared_list.cnt) { reset_prepared_list(&prepared_list); return 2; } send_deallocate(backend, &prepared_list, qcnt - qn); return 1; } query = pool_config.reset_query_list[qcnt]; /* if transaction state is idle, we don't need to issue ABORT */ if (TSTATE(backend) == 'I' && !strcmp("ABORT", query)) return 0; if (Query(NULL, backend, query) != POOL_CONTINUE) return -1; return 1; } /* * return non 0 if SQL is SELECT statement. */ static int is_select_query(char *sql) { if (pool_config.ignore_leading_white_space) { /* ignore leading white spaces */ while (*sql && isspace(*sql)) sql++; } return (!strncasecmp(sql, "SELECT", 6)); } /* * return non 0 if SQL is SELECT statement. */ static int is_sequence_query(char *sql) { if (pool_config.ignore_leading_white_space) { /* ignore leading white spaces */ while (*sql && isspace(*sql)) sql++; } if (strncasecmp(sql, "SELECT", 6)) return 0; sql += 6; while (*sql && isspace(*sql)) sql++; /* SELECT NEXTVAL('xxx') */ if (*sql && !strncasecmp(sql, "NEXTVAL", 7)) return 1; /* SELECT SETVAL('xxx') */ if (*sql && !strncasecmp(sql, "SETVAL", 6)) return 1; return 0; } /* * return non 0 if load balance is possible */ static int load_balance_enabled(POOL_CONNECTION_POOL *backend, char *sql) { return (pool_config.load_balance_mode && DUAL_MODE && MAJOR(backend) == PROTO_MAJOR_V3 && TSTATE(backend) == 'I' && is_select_query(sql) && !is_sequence_query(sql)); } /* * start load balance mode */ static void start_load_balance(POOL_CONNECTION_POOL *backend) { int i; int master; /* save backend connection slots */ for (i=0;inum;i++) { slots[i] = backend->slots[i]; } /* temporarily turn off replication mode */ if (REPLICATION) replication_was_enabled = 1; if (MASTER_SLAVE) master_slave_was_enabled = 1; REPLICATION = 0; MASTER_SLAVE = 0; /* choose a master in random manner with weight */ master = (random() <= weight_master)?0:1; backend->slots[0] = slots[master]; pool_debug("start_load_balance: selected master is %d", master); /* start load balancing */ in_load_balance = 1; } /* * finish load balance mode */ static void end_load_balance(POOL_CONNECTION_POOL *backend) { int i; /* restore backend connection slots */ for (i=0;inum;i++) { backend->slots[i] = slots[i]; } /* turn on replication mode */ REPLICATION = replication_was_enabled; MASTER_SLAVE = master_slave_was_enabled; replication_was_enabled = 0; master_slave_was_enabled = 0; in_load_balance = 0; pool_debug("end_load_balance: end load balance mode"); } /* * send error message to frontend */ void pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor, char *code, char *message, char *detail, char *hint, char *file, int line) { #define MAXDATA 1024 #define MAXMSGBUF 128 if (protoMajor == PROTO_MAJOR_V2) { pool_write(frontend, "E", 1); pool_write_and_flush(frontend, message, strlen(message)+1); } else if (protoMajor == PROTO_MAJOR_V3) { char data[MAXDATA]; char msgbuf[MAXMSGBUF]; int len; int thislen; int sendlen; len = 0; pool_write(frontend, "E", 1); /* error level */ thislen = snprintf(msgbuf, MAXMSGBUF, "SERROR"); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; /* code */ thislen = snprintf(msgbuf, MAXMSGBUF, "C%s", code); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; /* message */ thislen = snprintf(msgbuf, MAXMSGBUF, "M%s", message); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; /* detail */ if (*detail != '\0') { thislen = snprintf(msgbuf, MAXMSGBUF, "D%s", detail); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; } /* hint */ if (*hint != '\0') { thislen = snprintf(msgbuf, MAXMSGBUF, "H%s", hint); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; } /* file */ thislen = snprintf(msgbuf, MAXMSGBUF, "F%s", file); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; /* line */ thislen = snprintf(msgbuf, MAXMSGBUF, "L%d", line); memcpy(data +len, msgbuf, thislen+1); len += thislen + 1; /* stop null */ len++; *(data + len - 1) = '\0'; sendlen = len; len = htonl(len + 4); pool_write(frontend, &len, sizeof(len)); pool_write_and_flush(frontend, data, sendlen); } else pool_error("send_error_message: unknown protocol major %d", protoMajor); } /* * sends q query in sync manner. * this function sends a query and wait for CommandComplete/ReadyForQuery. * if an error occured, it returns with POOL_ERROR. * this function does NOT handle SELECT/SHOW quries. * if no_ready_for_query is non 0, returns without reading the packet * length for ReadyForQuery. This mode is necessary when called from ReadyForQuery(). */ static POOL_STATUS do_command(POOL_CONNECTION *backend, char *query, int protoMajor, int no_ready_for_query) { int len; int status; char kind; char *string; int deadlock_detected; pool_debug("do_command: Query: %s", query); /* send the query to the backend */ pool_write(backend, "Q", 1); len = strlen(query)+1; if (protoMajor == PROTO_MAJOR_V3) { int sendlen = htonl(len + 4); pool_write(backend, &sendlen, sizeof(sendlen)); } if (pool_write_and_flush(backend, query, len) < 0) { return POOL_END; } /* * We must check deadlock error because a aborted transaction * by detecting deadlock isn't same on all nodes. * If a transaction is aborted on master node, pgpool send a * error query to another nodes. */ deadlock_detected = detect_deadlock_error(backend, protoMajor); if (deadlock_detected < 0) return POOL_END; /* * Expecting CompleteCommand */ status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("do_command: error while reading message kind"); return POOL_END; } if (kind != 'C') { pool_log("do_command: backend does not successfully complete command %s status %c", query, kind); } /* * read command tag of CommandComplete response */ if (protoMajor == PROTO_MAJOR_V3) { if (pool_read(backend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(backend, len); if (string == NULL) return POOL_END; pool_debug("command tag: %s", string); } else { string = pool_read_string(backend, &len, 0); if (string == NULL) return POOL_END; } /* * Expecting ReadyForQuery */ status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("do_command: error while reading message kind"); return POOL_END; } if (kind != 'Z') { pool_error("do_command: backend does not return ReadyForQuery"); return POOL_END; } if (no_ready_for_query) return POOL_CONTINUE; if (protoMajor == PROTO_MAJOR_V3) { if (pool_read(backend, &len, sizeof(len)) < 0) return POOL_END; status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("do_command: error while reading transaction status"); return POOL_END; } /* set transaction state */ pool_debug("ReadyForQuery: transaction state: %c", kind); backend->tstate = kind; } return deadlock_detected ? POOL_DEADLOCK : POOL_CONTINUE; } /* * Send syntax error query to abort transaction. * We need to sync transaction status in transaction block. * SELECT query is sended to master only. * If SELECT is error, we must abort transaction on other nodes. */ static POOL_STATUS do_error_command(POOL_CONNECTION *backend, int protoMajor) { int len; int status; char kind; char *string; char *error_query = "send invalid query from pgpool to abort transaction"; /* send the query to the backend */ pool_write(backend, "Q", 1); len = strlen(error_query)+1; if (protoMajor == PROTO_MAJOR_V3) { int sendlen = htonl(len + 4); pool_write(backend, &sendlen, sizeof(sendlen)); } if (pool_write_and_flush(backend, error_query, len) < 0) { return POOL_END; } /* * Expecting CompleteCommand */ status = pool_read(backend, &kind, sizeof(kind)); if (status < 0) { pool_error("do_command: error while reading message kind"); return POOL_END; } /* * read ErrorResponse message */ if (protoMajor == PROTO_MAJOR_V3) { if (pool_read(backend, &len, sizeof(len)) < 0) return POOL_END; len = ntohl(len) - 4; string = pool_read2(backend, len); if (string == NULL) return POOL_END; pool_debug("command tag: %s", string); } else { string = pool_read_string(backend, &len, 0); if (string == NULL) return POOL_END; } return POOL_CONTINUE; } /* * judge if we need to lock the table * to keep SERIAL consistency among servers */ static int need_insert_lock(POOL_CONNECTION_POOL *backend, char *query) { if (MAJOR(backend) != PROTO_MAJOR_V3) return 0; if (pool_config.ignore_leading_white_space) { /* ignore leading white spaces */ while (*query && isspace(*query)) query++; } /* * either insert_lock directive specified and without "NO INSERT LOCK" comment * or "INSERT LOCK" comment exists? */ if ((pool_config.insert_lock && strncasecmp(query, NO_LOCK_COMMENT, NO_LOCK_COMMENT_SZ)) || strncasecmp(query, LOCK_COMMENT, LOCK_COMMENT_SZ) == 0) { /* INSERT STATEMENT? */ query = skip_comment(query); if (strncasecmp(query, "INSERT", 6) == 0) return 1; } return 0; } /* * if a transaction has not already started, start a new one. * issue LOCK TABLE IN SHARE ROW EXCLUSIVE MODE */ static POOL_STATUS insert_lock(POOL_CONNECTION_POOL *backend, char *query) { char *table; char qbuf[1024]; POOL_STATUS status; int i, deadlock_detected = 0; /* insert_lock can be used in V3 only */ if (MAJOR(backend) != PROTO_MAJOR_V3) return POOL_CONTINUE; /* get table name */ table = get_insert_command_table_name(query); /* could not get table name. probably wrong SQL command */ if (table == NULL) { return POOL_CONTINUE; } snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table); /* if we are not in a transaction block, * start a new transaction */ if (TSTATE(backend) == 'I') { for (i = 0;i < backend->num;i++) { if (do_command(backend->slots[i]->con, "BEGIN", PROTO_MAJOR_V3, 0) != POOL_CONTINUE) return POOL_END; } /* mark that we started new transaction */ internal_transaction_started = 1; } status = POOL_CONTINUE; /* issue lock table command */ for (i = 0;i < backend->num;i++) { if (deadlock_detected) status = do_command(backend->slots[i]->con, POOL_ERROR_QUERY, PROTO_MAJOR_V3, 0); else status = do_command(backend->slots[i]->con, qbuf, PROTO_MAJOR_V3, 0); if (status == POOL_DEADLOCK) deadlock_detected = 1; } return status; } /* * obtain table name in INSERT statement */ static char *get_insert_command_table_name(char *query) { static char table[1024]; char *qbuf; char *token; table[0] = '\0'; /* skip comment */ query = skip_comment(query); if (*query == '\0') return table; /* skip spaces */ while (*query && isspace(*query)) query++; /* skip non spaces(INSERT) */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* skip non spaces(INTO) */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* get table name */ qbuf = strdup(query); token = strtok(qbuf, "\r\n\t ("); if (token == NULL) { pool_error("get_insert_command_table_name: could not get table name"); return NULL; } strncpy(table, token, sizeof(table)); free(qbuf); pool_debug("get_insert_command_table_name: extracted table name: %s", table); return table; } /* * obtain portal name in EXECUTE statement */ static char *get_execute_command_portal_name(char *query) { static char portal[1024]; char *qbuf; char *token; portal[0] = '\0'; /* skip comment */ query = skip_comment(query); if (*query == '\0') return portal; /* skip spaces */ while (*query && isspace(*query)) query++; /* skip non spaces(EXECUTE) */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* get portal name */ qbuf = strdup(query); token = strtok(qbuf, "\r\n\t ("); if (token == NULL) { pool_error("get_execute_command_portal_name: could not get portal name"); return NULL; } strncpy(portal, token, sizeof(portal)); free(qbuf); pool_debug("get_execute_command_portal_name: extracted portal name: %s", portal); return portal; } /* * obtain portal name and statement in PREPARED statement */ static PreparedStatement *get_prepared_command_portal_and_statement(char *query) { PreparedStatement *stmt; static char portal[1024]; char *string = NULL; char *qbuf; char *token; int len; portal[0] = '\0'; /* skip comment */ query = skip_comment(query); if (*query == '\0') return NULL; /* skip spaces */ while (*query && isspace(*query)) query++; /* skip non spaces(PREPARED) */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* get portal name */ qbuf = strdup(query); token = strtok(qbuf, "\r\n\t ("); if (token == NULL) { pool_debug("get_prepared_command_portal_and_statement: could not get portal name"); return NULL; } strncpy(portal, token, sizeof(portal)); free(qbuf); /* skip data type list */ while (*query && *query != ')') query++; if (!*query) { pool_debug("get_prepared_command_portal_and_statement: could not get statement"); return NULL; } query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* skip non spaces(AS) */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; if (!*query) { pool_debug("get_prepared_command_portal_and_statement: could not get statement"); return NULL; } len = strlen(query) + 1; string = malloc(len); if (string == NULL) { pool_error("get_prepared_command_portal_and_statement: malloc failed: %s", strerror(errno)); return NULL; } memcpy(string, query, len); stmt = malloc(sizeof(PreparedStatement)); stmt->statement_name = normalize_prepared_stmt_name(portal); stmt->portal_name = NULL; stmt->prepared_string = string; return stmt; } /* judge if this is a DROP DATABASE command */ static int is_drop_database(char *query) { /* skip comment */ query = skip_comment(query); if (*query == '\0') return 0; /* skip spaces */ while (*query && isspace(*query)) query++; /* DROP? */ if (strncasecmp("DROP", query, 4)) return 0; /* skip DROP */ while (*query && !isspace(*query)) query++; /* skip spaces */ while (*query && isspace(*query)) query++; /* DATABASE? */ if (strncasecmp("DATABASE", query, 8)) return 0; return 1; } /* skip SQL comments */ static char *skip_comment(char *query) { if (strncmp(query, "/*", 2) == 0) { query += 2; while (query) { if (strncmp(query, "*/", 2) == 0) { query += 2; break; } query++; } } return query; } void init_prepared_list(void) { prepared_list.cnt = 0; prepared_list.size = INIT_STATEMENT_LIST_SIZE; prepared_list.stmt_list = malloc(sizeof(char *) * prepared_list.size); if (prepared_list.stmt_list == NULL) { pool_error("init_prepared_list: malloc failed: %s", strerror(errno)); exit(1); } } static void add_prepared_list(PreparedStatementList *p, PreparedStatement *stmt) { if (p->cnt == p->size) { p->size *= 2; p->stmt_list = realloc(p->stmt_list, sizeof(char *) * p->size); if (p->stmt_list == NULL) { pool_error("add_prepared_list: realloc failed: %s", strerror(errno)); exit(1); } } p->stmt_list[p->cnt++] = stmt; } static void add_unnamed_portal(PreparedStatementList *p, PreparedStatement *stmt) { if (unnamed_statement && unnamed_statement->statement_name == NULL) { free(unnamed_statement->prepared_string); free(unnamed_statement); } unnamed_portal = NULL; unnamed_statement = stmt; } static void del_prepared_list(PreparedStatementList *p, PreparedStatement *stmt) { int i; for (i = 0; i < p->cnt; i++) { if (strcmp(p->stmt_list[i]->statement_name, stmt->statement_name) == 0) break; } free(stmt->statement_name); free(stmt); if (i == p->cnt) return; free(p->stmt_list[i]->statement_name); free(p->stmt_list[i]->portal_name); free(p->stmt_list[i]->prepared_string); free(p->stmt_list[i]); if (i != p->cnt - 1) { memmove(&p->stmt_list[i], &p->stmt_list[i+1], sizeof(PreparedStatement *) * (p->cnt - i - 1)); } p->cnt--; } static void reset_prepared_list(PreparedStatementList *p) { int i; for (i = 0; i < p->cnt; i++) { free(p->stmt_list[i]->statement_name); free(p->stmt_list[i]->portal_name); free(p->stmt_list[i]->prepared_string); free(p->stmt_list[i]); } p->cnt = 0; } static PreparedStatement *lookup_prepared_statement_by_statement(PreparedStatementList *p, const char *name) { int i; /* unnamed portal? */ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) return unnamed_statement; for (i = 0; i < p->cnt; i++) { if (strcmp(p->stmt_list[i]->statement_name, name) == 0) return p->stmt_list[i]; } return NULL; } static PreparedStatement *lookup_prepared_statement_by_portal(PreparedStatementList *p, const char *name) { int i; /* unnamed portal? */ if (name == NULL || name[0] == '\0' || (name[0] == '\"' && name[1] == '\"')) return unnamed_portal; for (i = 0; i < p->cnt; i++) { if (p->stmt_list[i]->portal_name && strcmp(p->stmt_list[i]->portal_name, name) == 0) return p->stmt_list[i]; } return NULL; } static int send_deallocate(POOL_CONNECTION_POOL *backend, PreparedStatementList *p, int n) { char *query; int len; if (p->cnt <= n) return 1; len = strlen(p->stmt_list[n]->statement_name) + 14; /* "DEALLOCATE \"" + "\"" + '\0' */ query = malloc(len); if (query == NULL) { pool_error("send_deallocate: malloc failed: %s", strerror(errno)); exit(1); } sprintf(query, "DEALLOCATE \"%s\"", p->stmt_list[n]->statement_name); if (Query(NULL, backend, query) != POOL_CONTINUE) { free(query); return 1; } free(query); return 0; } static char *normalize_prepared_stmt_name(const char *name) { char *result; int i, len; len = strlen(name); if (name[0] != '"' && name[len-1] != '"') { result = strdup(name); if (result == NULL) return result; for (i = 0; i < len; i++) { if (isupper(result[i])) { result[i] += 32; /* convert to lower */ } } } else { result = malloc(len - 1); if (result == NULL) return result; result = memcpy(result, name+1, len-2); result[len-1] = '\0'; } return result; } static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend) { StartupPacket *sp; char psbuf[1024]; int i; /* skip comment */ query = skip_comment(query); if (*query == '\0') return; sp = MASTER_CONNECTION(backend)->sp; i = snprintf(psbuf, sizeof(psbuf), "%s %s %s ", sp->user, sp->database, remote_ps_data); /* skip spaces */ while (*query && isspace(*query)) query++; for (; i< sizeof(psbuf); i++) { if (!*query || isspace(*query)) break; psbuf[i] = toupper(*query++); } psbuf[i] = '\0'; set_ps_display(psbuf, false); } static POOL_STATUS error_kind_mismatch(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int kind, int kind1) { int sts; pool_error("pool_process_query: kind does not match between backends master(%c) secondary(%c)", kind, kind1); pool_send_error_message(frontend, MAJOR(backend), "XX000", "kind mismatch between backends", "", "check data consistency between master and secondary", __FILE__, __LINE__); /* health check */ sts = health_check(); if (sts == -1) { notice_backend_error(1); exit(1); } else if (sts == -2) { notice_backend_error(0); exit(1); } if (pool_config.replication_stop_on_mismatch) return POOL_FATAL; else return POOL_ERROR; } static int detect_deadlock_error(POOL_CONNECTION *master, int major) { int deadlock = 0; char kind; int readlen = 0, len; char *buf; char *p, *str; if ((buf = malloc(1024)) == NULL) { pool_error("detect_deadlock_error: malloc failed"); return -1; } if (pool_read(master, &kind, sizeof(kind))) return POOL_END; readlen += sizeof(kind); p = buf; memcpy(p, &kind, sizeof(kind)); p += sizeof(kind); if (kind == 'E') /* deadlock error? */ { /* read actual query */ if (major == PROTO_MAJOR_V3) { char *error_code; if (pool_read(master, &len, sizeof(len)) < 0) return POOL_END; readlen += sizeof(len); memcpy(p, &len, sizeof(len)); p += sizeof(len); len = ntohl(len) - 4; str = malloc(len); pool_read(master, str, len); readlen += len; if (readlen > 1024) { buf = realloc(buf, readlen); if (buf == NULL) { pool_error("detect_deadlock_error: malloc failed"); return -1; } } memcpy(p, str, len); error_code = str; while (*error_code) { if (*error_code == 'C') { if (strcmp(error_code+1, DEADLOCK_ERROR_CODE) == 0) /* deadlock error */ { pool_debug("SimpleQuery: receive deadlock error from master node."); deadlock = 1; } break; } else error_code = error_code + strlen(error_code) + 1; } free(str); } else { str = pool_read_string(master, &len, 0); readlen += len; if (readlen > 1024) { buf = realloc(buf, readlen); if (buf == NULL) { pool_error("detect_deadlock_error: malloc failed"); return -1; } } memcpy(p, str, len); } } if (pool_unread(master, buf, readlen) != 0) deadlock = -1; free(buf); return deadlock; }