diff options
Diffstat (limited to 'src/protocol')
-rw-r--r-- | src/protocol/CommandComplete.c | 94 | ||||
-rw-r--r-- | src/protocol/child.c | 161 | ||||
-rw-r--r-- | src/protocol/pool_connection_pool.c | 32 | ||||
-rw-r--r-- | src/protocol/pool_pg_utils.c | 94 | ||||
-rw-r--r-- | src/protocol/pool_process_query.c | 318 | ||||
-rw-r--r-- | src/protocol/pool_proto2.c | 40 | ||||
-rw-r--r-- | src/protocol/pool_proto_modules.c | 451 |
7 files changed, 621 insertions, 569 deletions
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index b71678155..ef144ca31 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -40,15 +40,15 @@ #include "utils/pool_stream.h" static int extract_ntuples(char *message); -static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete); -static int forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen); -static int forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen); -static int forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen); -static void process_clear_cache(POOL_CONNECTION_POOL * backend); +static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete); +static int forward_command_complete(POOL_CONNECTION *frontend, char *packet, int packetlen); +static int forward_empty_query(POOL_CONNECTION *frontend, char *packet, int packetlen); +static int forward_packet_to_frontend(POOL_CONNECTION *frontend, char kind, char *packet, int packetlen); +static void process_clear_cache(POOL_CONNECTION_POOL *backend); static bool check_alter_role_statement(AlterRoleStmt *stmt); POOL_STATUS -CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete) +CommandComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, bool command_complete) { int len, len1; @@ -230,9 +230,9 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool state = TSTATE(backend, MAIN_NODE_ID); /* - * If some rows have been fetched by an execute with non 0 row option, - * we do not create cache. - */ + * If some rows have been fetched by an execute with non 0 row + * option, we do not create cache. + */ pool_handle_query_cache(backend, query, node, state, session_context->query_context->partial_fetch); @@ -276,7 +276,7 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool if (can_query_context_destroy(session_context->query_context)) { - POOL_SENT_MESSAGE * msg = pool_get_sent_message_by_query_context(session_context->query_context); + POOL_SENT_MESSAGE *msg = pool_get_sent_message_by_query_context(session_context->query_context); if (!msg || (msg && *msg->name == '\0')) { @@ -294,7 +294,7 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool * Handle misc process which is necessary when query context exists. */ void -handle_query_context(POOL_CONNECTION_POOL * backend) +handle_query_context(POOL_CONNECTION_POOL *backend) { POOL_SESSION_CONTEXT *session_context; Node *node; @@ -344,13 +344,13 @@ handle_query_context(POOL_CONNECTION_POOL * backend) } /* - * JDBC driver sends "BEGIN" query internally if setAutoCommit(false). - * But it does not send Sync message after "BEGIN" query. In extended - * query protocol, PostgreSQL returns ReadyForQuery when a client sends - * Sync message. Problem is, pgpool can't know the transaction state - * without receiving ReadyForQuery. So we remember that we need to send - * Sync message internally afterward, whenever we receive BEGIN in - * extended protocol. + * JDBC driver sends "BEGIN" query internally if setAutoCommit(false). But + * it does not send Sync message after "BEGIN" query. In extended query + * protocol, PostgreSQL returns ReadyForQuery when a client sends Sync + * message. Problem is, pgpool can't know the transaction state without + * receiving ReadyForQuery. So we remember that we need to send Sync + * message internally afterward, whenever we receive BEGIN in extended + * protocol. */ else if (IsA(node, TransactionStmt)) { @@ -374,10 +374,10 @@ handle_query_context(POOL_CONNECTION_POOL * backend) pool_unset_failed_transaction(); pool_unset_transaction_isolation(); } - else if (stmt->kind == TRANS_STMT_COMMIT) + else if (stmt->kind == TRANS_STMT_COMMIT) { /* Commit ongoing CREATE/DROP temp table status */ - pool_temp_tables_commit_pending(); + pool_temp_tables_commit_pending(); /* Forget a transaction was started by multi statement query */ unset_tx_started_by_multi_statement_query(); @@ -412,12 +412,13 @@ handle_query_context(POOL_CONNECTION_POOL * backend) else if (IsA(node, CreateStmt)) { CreateStmt *stmt = (CreateStmt *) node; - POOL_TEMP_TABLE_STATE state; + POOL_TEMP_TABLE_STATE state; /* Is this a temporary table? */ if (stmt->relation->relpersistence == 't') { - if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */ + if (TSTATE(backend, MAIN_NODE_ID) == 'T') /* Are we inside a + * transaction? */ { state = TEMP_TABLE_CREATING; } @@ -433,8 +434,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend) } else if (IsA(node, DropStmt)) { - DropStmt *stmt = (DropStmt *) node; - POOL_TEMP_TABLE_STATE state; + DropStmt *stmt = (DropStmt *) node; + POOL_TEMP_TABLE_STATE state; if (stmt->removeType == OBJECT_TABLE) { @@ -442,7 +443,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend) ListCell *cell; ListCell *next; - if (TSTATE(backend, MAIN_NODE_ID ) == 'T') /* Are we inside a transaction? */ + if (TSTATE(backend, MAIN_NODE_ID) == 'T') /* Are we inside a + * transaction? */ { state = TEMP_TABLE_DROPPING; } @@ -453,7 +455,8 @@ handle_query_context(POOL_CONNECTION_POOL * backend) for (cell = list_head(session_context->temp_tables); cell; cell = next) { - char *tablename = (char *)lfirst(cell); + char *tablename = (char *) lfirst(cell); + ereport(DEBUG1, (errmsg("Dropping temp table: %s", tablename))); pool_temp_tables_delete(tablename, state); @@ -478,7 +481,7 @@ handle_query_context(POOL_CONNECTION_POOL * backend) } else if (IsA(node, GrantStmt)) { - GrantStmt *stmt = (GrantStmt *) node; + GrantStmt *stmt = (GrantStmt *) node; /* REVOKE? */ if (stmt->is_grant) @@ -510,15 +513,15 @@ handle_query_context(POOL_CONNECTION_POOL * backend) static bool check_alter_role_statement(AlterRoleStmt *stmt) { - ListCell *l; + ListCell *l; foreach(l, stmt->options) { - DefElem *elm = (DefElem *) lfirst(l); + DefElem *elm = (DefElem *) lfirst(l); /* - * We want to detect other than ALTER ROLE foo WITH PASSWORD or - * WITH CONNECTION LIMIT case. It does not change any privilege of the + * We want to detect other than ALTER ROLE foo WITH PASSWORD or WITH + * CONNECTION LIMIT case. It does not change any privilege of the * role. */ if (strcmp(elm->defname, "password") && @@ -553,7 +556,8 @@ extract_ntuples(char *message) /* * Handle mismatch tuples */ -static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *packet, int packetlen, bool command_complete) +static POOL_STATUS +handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete) { POOL_SESSION_CONTEXT *session_context; @@ -620,7 +624,7 @@ static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNE if (session_context->mismatch_ntuples) { - StringInfoData msg; + StringInfoData msg; initStringInfo(&msg); appendStringInfoString(&msg, "pgpool detected difference of the number of inserted, updated or deleted tuples. Possible last query was: \""); @@ -667,7 +671,7 @@ static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION * frontend, POOL_CONNE * Forward Command complete packet to frontend */ static int -forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen) +forward_command_complete(POOL_CONNECTION *frontend, char *packet, int packetlen) { return forward_packet_to_frontend(frontend, 'C', packet, packetlen); } @@ -676,7 +680,7 @@ forward_command_complete(POOL_CONNECTION * frontend, char *packet, int packetlen * Forward Empty query response to frontend */ static int -forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen) +forward_empty_query(POOL_CONNECTION *frontend, char *packet, int packetlen) { return forward_packet_to_frontend(frontend, 'I', packet, packetlen); } @@ -685,7 +689,7 @@ forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen) * Forward packet to frontend */ static int -forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen) +forward_packet_to_frontend(POOL_CONNECTION *frontend, char kind, char *packet, int packetlen) { int sendlen; @@ -705,7 +709,7 @@ forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, * Process statements that need clearing query cache */ static void -process_clear_cache(POOL_CONNECTION_POOL * backend) +process_clear_cache(POOL_CONNECTION_POOL *backend) { /* Query cache enabled? */ if (!pool_config->memory_cache_enabled) @@ -717,15 +721,15 @@ process_clear_cache(POOL_CONNECTION_POOL * backend) /* * Are we inside a transaction? */ - if (TSTATE(backend, MAIN_NODE_ID ) == 'T') + if (TSTATE(backend, MAIN_NODE_ID) == 'T') { /* - * Disable query cache in this transaction. - * All query cache will be cleared at commit. + * Disable query cache in this transaction. All query cache will + * be cleared at commit. */ set_query_cache_disabled_tx(); } - else if (TSTATE(backend, MAIN_NODE_ID ) == 'I') /* outside transaction */ + else if (TSTATE(backend, MAIN_NODE_ID) == 'I') /* outside transaction */ { /* * Clear all the query cache. @@ -738,14 +742,14 @@ process_clear_cache(POOL_CONNECTION_POOL * backend) /* * Are we inside a transaction? */ - if (TSTATE(backend, MAIN_NODE_ID ) == 'T') + if (TSTATE(backend, MAIN_NODE_ID) == 'T') { /* Inside user started transaction? */ if (!INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID)) { /* - * Disable query cache in this transaction. - * All query cache will be cleared at commit. + * Disable query cache in this transaction. All query cache + * will be cleared at commit. */ set_query_cache_disabled_tx(); } @@ -757,7 +761,7 @@ process_clear_cache(POOL_CONNECTION_POOL * backend) clear_query_cache(); } } - else if (TSTATE(backend, MAIN_NODE_ID ) == 'I') /* outside transaction */ + else if (TSTATE(backend, MAIN_NODE_ID) == 'I') /* outside transaction */ { /* * Clear all the query cache. diff --git a/src/protocol/child.c b/src/protocol/child.c index 1ef88910d..cf2161806 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -68,19 +68,19 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" -static StartupPacket *read_startup_packet(POOL_CONNECTION * cp); -static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend); +static StartupPacket *read_startup_packet(POOL_CONNECTION *cp); +static POOL_CONNECTION_POOL *connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend); static RETSIGTYPE die(int sig); static RETSIGTYPE close_idle_connection(int sig); static RETSIGTYPE wakeup_handler(int sig); static RETSIGTYPE reload_config_handler(int sig); static RETSIGTYPE authentication_timeout(int sig); -static void send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +static void send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static int connection_count_up(void); static void connection_count_down(void); -static bool connect_using_existing_connection(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - StartupPacket *sp); +static bool connect_using_existing_connection(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + StartupPacket *sp); static void check_restart_request(void); static void check_exit_request(void); static void enable_authentication_timeout(void); @@ -89,17 +89,17 @@ static int wait_for_new_connections(int *fds, SockAddr *saddr); static void check_config_reload(void); static void get_backends_status(unsigned int *valid_backends, unsigned int *down_backends); static void validate_backend_connectivity(int front_end_fd); -static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr); -static POOL_CONNECTION_POOL * get_backend_connection(POOL_CONNECTION * frontend); +static POOL_CONNECTION *get_connection(int front_end_fd, SockAddr *saddr); +static POOL_CONNECTION_POOL *get_backend_connection(POOL_CONNECTION *frontend); static StartupPacket *StartupPacketCopy(StartupPacket *sp); static void log_disconnections(char *database, char *username); static void print_process_status(char *remote_host, char *remote_port); -static bool backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid); +static bool backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid); static void child_will_go_down(int code, Datum arg); -static int opt_sort(const void *a, const void *b); +static int opt_sort(const void *a, const void *b); -static bool unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt); +static bool unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt); /* * Non 0 means SIGTERM (smart shutdown) or SIGINT (fast shutdown) has arrived @@ -154,11 +154,12 @@ do_child(int *fds) sigjmp_buf local_sigjmp_buf; POOL_CONNECTION_POOL *volatile backend = NULL; - /* counter for child_max_connections. "volatile" declaration is necessary + /* + * counter for child_max_connections. "volatile" declaration is necessary * so that this is counted up even if long jump is issued due to * ereport(ERROR). */ - volatile int connections_count = 0; + volatile int connections_count = 0; char psbuf[NI_MAXHOST + 128]; @@ -355,7 +356,8 @@ do_child(int *fds) */ if (con_count > (pool_config->num_init_children - pool_config->reserved_connections)) { - POOL_CONNECTION * cp; + POOL_CONNECTION *cp; + cp = pool_open(front_end_fd, false); if (cp == NULL) { @@ -405,8 +407,8 @@ do_child(int *fds) pool_initialize_private_backend_status(); /* - * Connect to backend. Also do authentication between - * frontend <--> pgpool and pgpool <--> backend. + * Connect to backend. Also do authentication between frontend <--> + * pgpool and pgpool <--> backend. */ backend = get_backend_connection(child_frontend); if (!backend) @@ -513,7 +515,7 @@ do_child(int *fds) * return true if backend connection is cached */ static bool -backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * volatile backend, bool frontend_invalid) +backend_cleanup(POOL_CONNECTION *volatile *frontend, POOL_CONNECTION_POOL *volatile backend, bool frontend_invalid) { StartupPacket *sp; bool cache_connection = false; @@ -600,18 +602,18 @@ backend_cleanup(POOL_CONNECTION * volatile *frontend, POOL_CONNECTION_POOL * vol * Read the startup packet and parse the contents. */ static StartupPacket * -read_startup_packet(POOL_CONNECTION * cp) +read_startup_packet(POOL_CONNECTION *cp) { StartupPacket *sp; StartupPacket_v2 *sp2; int protov; int len; char *p; - char **guc_options; - int opt_num = 0; - char *sp_sort; - char *tmpopt; - int i; + char **guc_options; + int opt_num = 0; + char *sp_sort; + char *tmpopt; + int i; sp = (StartupPacket *) palloc0(sizeof(*sp)); @@ -659,37 +661,38 @@ read_startup_packet(POOL_CONNECTION * cp) case PROTO_MAJOR_V3: /* V3 */ /* copy startup_packet */ sp_sort = palloc0(len); - memcpy(sp_sort,sp->startup_packet,len); + memcpy(sp_sort, sp->startup_packet, len); p = sp_sort; - p += sizeof(int); /* skip protocol version info */ + p += sizeof(int); /* skip protocol version info */ /* count the number of options */ while (*p) { - p += (strlen(p) + 1); /* skip option name */ - p += (strlen(p) + 1); /* skip option value */ - opt_num ++; + p += (strlen(p) + 1); /* skip option name */ + p += (strlen(p) + 1); /* skip option value */ + opt_num++; } - guc_options = (char **)palloc0(opt_num * sizeof(char *)); + guc_options = (char **) palloc0(opt_num * sizeof(char *)); /* get guc_option name list */ p = sp_sort + sizeof(int); for (i = 0; i < opt_num; i++) { guc_options[i] = p; - p += (strlen(p) + 1); /* skip option name */ - p += (strlen(p) + 1); /* skip option value */ + p += (strlen(p) + 1); /* skip option name */ + p += (strlen(p) + 1); /* skip option value */ } /* sort option name using quick sort */ - qsort( (void *)guc_options, opt_num, sizeof(char *), opt_sort ); + qsort((void *) guc_options, opt_num, sizeof(char *), opt_sort); - p = sp->startup_packet + sizeof(int); /* skip protocol version info */ + p = sp->startup_packet + sizeof(int); /* skip protocol version + * info */ for (i = 0; i < opt_num; i++) { tmpopt = guc_options[i]; - memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option name */ + memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option name */ p += (strlen(tmpopt) + 1); tmpopt += (strlen(tmpopt) + 1); - memcpy(p, tmpopt ,strlen(tmpopt) + 1); /* memcpy option value */ + memcpy(p, tmpopt, strlen(tmpopt) + 1); /* memcpy option value */ p += (strlen(tmpopt) + 1); } @@ -733,7 +736,7 @@ read_startup_packet(POOL_CONNECTION * cp) { ereport(DEBUG1, (errmsg("reading startup packet"), - errdetail("guc name: %s value: %s", p, p+strlen(p)+1))); + errdetail("guc name: %s value: %s", p, p + strlen(p) + 1))); p += (strlen(p) + 1); } @@ -789,8 +792,8 @@ read_startup_packet(POOL_CONNECTION * cp) * Reuse existing connection */ static bool -connect_using_existing_connection(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +connect_using_existing_connection(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, StartupPacket *sp) { int i, @@ -824,8 +827,8 @@ connect_using_existing_connection(POOL_CONNECTION * frontend, /* Reuse existing connection to backend */ frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); oldContext = MemoryContextSwitchTo(frontend_auth_cxt); pool_do_reauth(frontend, backend); @@ -905,7 +908,7 @@ connect_using_existing_connection(POOL_CONNECTION * frontend, * process cancel request */ void -cancel_request(CancelPacket * sp, int32 splen) +cancel_request(CancelPacket *sp, int32 splen) { int len; int fd; @@ -915,7 +918,7 @@ cancel_request(CancelPacket * sp, int32 splen) k; ConnectionInfo *c = NULL; bool found = false; - int32 keylen; /* cancel key length */ + int32 keylen; /* cancel key length */ if (pool_config->log_client_messages) ereport(LOG, @@ -958,7 +961,8 @@ cancel_request(CancelPacket * sp, int32 splen) errdetail("found pid:%d keylen:%d i:%d", ntohl(c->pid), c->keylen, i))); /* - * "c" is a pointer to i th child, j th pool, and 0 th backend. + * "c" is a pointer to i th child, j th pool, and 0 th + * backend. */ c = pool_coninfo(i, j, 0); found = true; @@ -978,12 +982,12 @@ found: /* * We are sending cancel request message to all backend groups. So some - * of query cancel requests may not work but it should not be a - * problem. They are just ignored by the backend. + * of query cancel requests may not work but it should not be a problem. + * They are just ignored by the backend. */ for (i = 0; i < NUM_BACKENDS; i++, c++) { - int32 cancel_request_code; + int32 cancel_request_code; if (!VALID_BACKEND(i)) continue; @@ -1006,9 +1010,10 @@ found: pool_set_db_node_id(con, i); - len = htonl(splen + sizeof(int32)); /* splen does not include packet length field */ - pool_write(con, &len, sizeof(len)); /* send cancel messages length */ - cancel_request_code = htonl(PG_PROTOCOL(1234,5678)); /* cancel request code */ + len = htonl(splen + sizeof(int32)); /* splen does not include packet + * length field */ + pool_write(con, &len, sizeof(len)); /* send cancel messages length */ + cancel_request_code = htonl(PG_PROTOCOL(1234, 5678)); /* cancel request code */ pool_write(con, &cancel_request_code, sizeof(int32)); pool_write(con, &c->pid, sizeof(int32)); /* send pid */ pool_write(con, c->key, keylen); /* send cancel key */ @@ -1068,11 +1073,12 @@ StartupPacketCopy(StartupPacket *sp) * Create a new connection to backend. * Authentication is performed if requested by backend. */ -static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * frontend) +static POOL_CONNECTION_POOL * +connect_backend(StartupPacket *sp, POOL_CONNECTION *frontend) { POOL_CONNECTION_POOL *backend; StartupPacket *volatile topmem_sp = NULL; - volatile bool topmem_sp_set = false; + volatile bool topmem_sp_set = false; int i; /* connect to the backend */ @@ -1122,8 +1128,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION * do authentication stuff */ frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); oldContext = MemoryContextSwitchTo(frontend_auth_cxt); /* do authentication against backend */ @@ -1141,7 +1147,8 @@ static POOL_CONNECTION_POOL * connect_backend(StartupPacket *sp, POOL_CONNECTION } PG_END_TRY(); - /* At this point, we need to free previously allocated memory for the + /* + * At this point, we need to free previously allocated memory for the * startup packet if no backend is up. */ if (!topmem_sp_set && topmem_sp != NULL) @@ -1244,7 +1251,7 @@ static RETSIGTYPE close_idle_connection(int sig) if (CONNECTION_SLOT(p, main_node_id)->closetime > 0) /* idle connection? */ { - bool freed = false; + bool freed = false; pool_send_frontend_exits(p); @@ -1313,7 +1320,7 @@ disable_authentication_timeout(void) * Send parameter status message to frontend. */ static void -send_params(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +send_params(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int index; char *name, @@ -1363,7 +1370,7 @@ child_will_go_down(int code, Datum arg) if (child_frontend) log_disconnections(child_frontend->database, child_frontend->username); else - log_disconnections("",""); + log_disconnections("", ""); } } @@ -1594,7 +1601,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) } else { - int sts; + int sts; for (;;) { @@ -1609,10 +1616,10 @@ wait_for_new_connections(int *fds, SockAddr *saddr) if (backend_timer_expired) { /* - * We add 10 seconds to connection_life_time so that there's - * enough margin. + * We add 10 seconds to connection_life_time so that + * there's enough margin. */ - int seconds = pool_config->connection_life_time + 10; + int seconds = pool_config->connection_life_time + 10; while (seconds-- > 0) { @@ -1627,7 +1634,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) } } } - else /* success or other error */ + else /* success or other error */ break; } } @@ -1661,7 +1668,7 @@ wait_for_new_connections(int *fds, SockAddr *saddr) numfds = select(nsocks, &rmask, NULL, NULL, timeout); - /* not timeout*/ + /* not timeout */ if (numfds != 0) break; @@ -1796,9 +1803,10 @@ retry_accept: } static bool -unix_fds_not_isset(int* fds, int num_unix_fds, fd_set* opt) +unix_fds_not_isset(int *fds, int num_unix_fds, fd_set *opt) { - int i; + int i; + for (i = 0; i < num_unix_fds; i++) { if (!FD_ISSET(fds[i], opt)) @@ -1900,7 +1908,7 @@ validate_backend_connectivity(int front_end_fd) error_hint, __FILE__, __LINE__); - + } PG_CATCH(); { @@ -1926,7 +1934,7 @@ static POOL_CONNECTION * get_connection(int front_end_fd, SockAddr *saddr) { POOL_CONNECTION *cp; - ProcessInfo *pi; + ProcessInfo *pi; ereport(DEBUG1, (errmsg("I am %d accept fd %d", getpid(), front_end_fd))); @@ -1993,7 +2001,7 @@ get_connection(int front_end_fd, SockAddr *saddr) * pgpool <--> backend. */ static POOL_CONNECTION_POOL * -get_backend_connection(POOL_CONNECTION * frontend) +get_backend_connection(POOL_CONNECTION *frontend) { int found = 0; StartupPacket *sp; @@ -2049,8 +2057,8 @@ retry_startup: * return if frontend was rejected; it simply terminates this process. */ MemoryContext frontend_auth_cxt = AllocSetContextCreate(CurrentMemoryContext, - "frontend_auth", - ALLOCSET_DEFAULT_SIZES); + "frontend_auth", + ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(frontend_auth_cxt); /* @@ -2133,8 +2141,8 @@ retry_startup: if (backend == NULL) { /* - * Create a new connection to backend. - * Authentication is performed if requested by backend. + * Create a new connection to backend. Authentication is performed if + * requested by backend. */ backend = connect_backend(sp, frontend); } @@ -2153,7 +2161,7 @@ static void log_disconnections(char *database, char *username) { struct timeval endTime; - long diff; + long diff; long secs; int msecs, hours, @@ -2161,7 +2169,7 @@ log_disconnections(char *database, char *username) seconds; gettimeofday(&endTime, NULL); - diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec)); + diff = (long) ((endTime.tv_sec - startTime.tv_sec) * 1000000 + (endTime.tv_usec - startTime.tv_usec)); msecs = (int) (diff % 1000000) / 1000; secs = (long) (diff / 1000000); @@ -2233,9 +2241,10 @@ pg_frontend_exists(void) return 0; } -static int opt_sort(const void *a, const void *b) +static int +opt_sort(const void *a, const void *b) { - return strcmp( *(char **)a, *(char **)b); + return strcmp(*(char **) a, *(char **) b); } void diff --git a/src/protocol/pool_connection_pool.c b/src/protocol/pool_connection_pool.c index 666187216..c3b369dc2 100644 --- a/src/protocol/pool_connection_pool.c +++ b/src/protocol/pool_connection_pool.c @@ -63,8 +63,8 @@ volatile sig_atomic_t backend_timer_expired = 0; /* flag for connection * closed timer is expired */ volatile sig_atomic_t health_check_timer_expired; /* non 0 if health check * timer expired */ -static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int slot); -static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p); +static POOL_CONNECTION_POOL_SLOT *create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot); +static POOL_CONNECTION_POOL *new_connection(POOL_CONNECTION_POOL *p); static int check_socket_status(int fd); static bool connect_with_timeout(int fd, struct addrinfo *walk, char *host, int port, bool retry); @@ -161,6 +161,7 @@ pool_get_cp(char *user, char *database, int protoMajor, int check_socket) ereport(LOG, (errmsg("connection closed."), errdetail("retry to create new connection pool"))); + /* * It is possible that one of backend just broke. sleep 1 * second to wait for failover occurres, then wait for the @@ -259,7 +260,7 @@ pool_create_cp(void) POOL_CONNECTION_POOL *oldestp; POOL_CONNECTION_POOL *ret; ConnectionInfo *info; - int main_node_id; + int main_node_id; POOL_CONNECTION_POOL *p = pool_connection_pool; @@ -297,7 +298,7 @@ pool_create_cp(void) { main_node_id = in_use_backend_id(p); if (main_node_id < 0) - elog(ERROR, "no in use backend found"); /* this should not happen */ + elog(ERROR, "no in use backend found"); /* this should not happen */ ereport(DEBUG1, (errmsg("creating connection pool"), @@ -318,7 +319,7 @@ pool_create_cp(void) p = oldestp; main_node_id = in_use_backend_id(p); if (main_node_id < 0) - elog(ERROR, "no in use backend found"); /* this should not happen */ + elog(ERROR, "no in use backend found"); /* this should not happen */ pool_send_frontend_exits(p); ereport(DEBUG1, @@ -358,7 +359,7 @@ pool_create_cp(void) * set backend connection close timer */ void -pool_connection_pool_timer(POOL_CONNECTION_POOL * backend) +pool_connection_pool_timer(POOL_CONNECTION_POOL *backend) { POOL_CONNECTION_POOL *p = pool_connection_pool; int i; @@ -782,7 +783,7 @@ connect_inet_domain_socket_by_port(char *host, int port, bool retry) struct addrinfo *res; struct addrinfo *walk; struct addrinfo hints; - int retry_cnt = 5; /* getaddrinfo() retry count in case EAI_AGAIN */ + int retry_cnt = 5; /* getaddrinfo() retry count in case EAI_AGAIN */ /* * getaddrinfo() requires a string because it also accepts service names, @@ -875,7 +876,8 @@ connect_inet_domain_socket_by_port(char *host, int port, bool retry) /* * create connection pool */ -static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int slot) +static POOL_CONNECTION_POOL_SLOT * +create_cp(POOL_CONNECTION_POOL_SLOT *cp, int slot) { BackendInfo *b = &pool_config->backend_desc->backend_info[slot]; int fd; @@ -902,13 +904,14 @@ static POOL_CONNECTION_POOL_SLOT * create_cp(POOL_CONNECTION_POOL_SLOT * cp, int * Create actual connections to backends. * New connection resides in TopMemoryContext. */ -static POOL_CONNECTION_POOL * new_connection(POOL_CONNECTION_POOL * p) +static POOL_CONNECTION_POOL * +new_connection(POOL_CONNECTION_POOL *p) { POOL_CONNECTION_POOL_SLOT *s; int active_backend_count = 0; int i; bool status_changed = false; - volatile BACKEND_STATUS status; + volatile BACKEND_STATUS status; MemoryContext oldContext = MemoryContextSwitchTo(TopMemoryContext); @@ -1097,7 +1100,7 @@ close_all_backend_connections(void) for (i = 0; i < pool_config->max_pool; i++, p++) { - int backend_id = in_use_backend_id(p); + int backend_id = in_use_backend_id(p); if (backend_id < 0) continue; @@ -1120,9 +1123,10 @@ close_all_backend_connections(void) void update_pooled_connection_count(void) { - int i; - int count = 0; + int i; + int count = 0; POOL_CONNECTION_POOL *p = pool_connection_pool; + for (i = 0; i < pool_config->max_pool; i++, p++) { if (MAIN_CONNECTION(p)) @@ -1138,7 +1142,7 @@ update_pooled_connection_count(void) int in_use_backend_id(POOL_CONNECTION_POOL *pool) { - int i; + int i; for (i = 0; i < NUM_BACKENDS; i++) { diff --git a/src/protocol/pool_pg_utils.c b/src/protocol/pool_pg_utils.c index ccbe2b03c..2eebbbb3c 100644 --- a/src/protocol/pool_pg_utils.c +++ b/src/protocol/pool_pg_utils.c @@ -41,7 +41,7 @@ #include "pool_config_variables.h" static int choose_db_node_id(char *str); -static void free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp); +static void free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT *cp); static void si_enter_critical_region(void); static void si_leave_critical_region(void); @@ -64,9 +64,9 @@ make_persistent_db_connection( { int protoVersion; char data[MAX_USER_AND_DATABASE]; - } StartupPacket_v3; + } StartupPacket_v3; - static StartupPacket_v3 * startup_packet; + static StartupPacket_v3 *startup_packet; int len, len1; @@ -200,8 +200,8 @@ make_persistent_db_connection_noerror( * receives an ERROR, it stops processing and terminates, which is not * good. This is problematic especially with pcp_node_info, since it * calls db_node_role(), and db_node_role() calls this function. So if - * the target PostgreSQL is down, EmitErrorReport() sends ERROR message - * to pcp frontend and it stops (see process_pcp_response() in + * the target PostgreSQL is down, EmitErrorReport() sends ERROR + * message to pcp frontend and it stops (see process_pcp_response() in * src/libs/pcp/pcp.c. To fix this, just eliminate calling * EmitErrorReport(). This will suppress ERROR message but as you can * see the comment in this function "does not ereports in case of an @@ -221,7 +221,7 @@ make_persistent_db_connection_noerror( * make_persistent_db_connection and discard_persistent_db_connection. */ static void -free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp) +free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT *cp) { if (!cp) return; @@ -245,7 +245,7 @@ free_persistent_db_connection_memory(POOL_CONNECTION_POOL_SLOT * cp) * make_persistent_db_connection(). */ void -discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp) +discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT *cp) { int len; @@ -274,7 +274,7 @@ discard_persistent_db_connection(POOL_CONNECTION_POOL_SLOT * cp) * send startup packet */ void -send_startup_packet(POOL_CONNECTION_POOL_SLOT * cp) +send_startup_packet(POOL_CONNECTION_POOL_SLOT *cp) { int len; @@ -319,10 +319,10 @@ select_load_balancing_node(void) int tmp; int no_load_balance_node_id = -2; uint64 lowest_delay; - int lowest_delay_nodes[NUM_BACKENDS]; + int lowest_delay_nodes[NUM_BACKENDS]; /* prng state data for load balancing */ - static pg_prng_state backsel_state; + static pg_prng_state backsel_state; /* * -2 indicates there's no database_redirect_preference_list. -1 indicates @@ -443,24 +443,27 @@ select_load_balancing_node(void) if (suggested_node_id >= 0) { /* - * If pgpool is running in Streaming Replication mode and delay_threshold - * and prefer_lower_delay_standby are true, we choose the least delayed - * node if suggested_node is standby and delayed over delay_threshold. + * If pgpool is running in Streaming Replication mode and + * delay_threshold and prefer_lower_delay_standby are true, we choose + * the least delayed node if suggested_node is standby and delayed + * over delay_threshold. */ if (STREAM && pool_config->prefer_lower_delay_standby && suggested_node_id != PRIMARY_NODE_ID && check_replication_delay(suggested_node_id) < 0) { ereport(DEBUG1, - (errmsg("selecting load balance node"), - errdetail("suggested backend %d is streaming delayed over delay_threshold", suggested_node_id))); + (errmsg("selecting load balance node"), + errdetail("suggested backend %d is streaming delayed over delay_threshold", suggested_node_id))); /* - * The new load balancing node is selected from the - * nodes which have the lowest delay. + * The new load balancing node is selected from the nodes which + * have the lowest delay. */ if (pool_config->delay_threshold_by_time > 0) - lowest_delay = pool_config->delay_threshold_by_time * 1000; /* convert from milli seconds to micro seconds */ + lowest_delay = pool_config->delay_threshold_by_time * 1000; /* convert from milli + * seconds to micro + * seconds */ else lowest_delay = pool_config->delay_threshold; @@ -484,7 +487,8 @@ select_load_balancing_node(void) } else if (lowest_delay > BACKEND_INFO(i).standby_delay) { - int ii; + int ii; + lowest_delay = BACKEND_INFO(i).standby_delay; for (ii = 0; ii < NUM_BACKENDS; ii++) { @@ -628,7 +632,8 @@ select_load_balancing_node(void) } else if (lowest_delay > BACKEND_INFO(i).standby_delay) { - int ii; + int ii; + lowest_delay = BACKEND_INFO(i).standby_delay; for (ii = 0; ii < NUM_BACKENDS; ii++) { @@ -679,13 +684,13 @@ static void initialize_prng(pg_prng_state *state) { static bool prng_seed_set = false; - uint64 seed; + uint64 seed; if (unlikely(!prng_seed_set)) { /* initialize prng */ if (!pg_strong_random(&seed, sizeof(seed))) - seed = UINT64CONST(1); /* Pick a value, as long as it spreads */ + seed = UINT64CONST(1); /* Pick a value, as long as it spreads */ pg_prng_seed(state, seed); prng_seed_set = true; } @@ -702,17 +707,17 @@ initialize_prng(pg_prng_state *state) * */ PGVersion * -Pgversion(POOL_CONNECTION_POOL * backend) +Pgversion(POOL_CONNECTION_POOL *backend) { #define VERSION_BUF_SIZE 10 - static PGVersion pgversion; - static POOL_RELCACHE *relcache; - char *result; - char *p; - char buf[VERSION_BUF_SIZE]; - int i; - int major; - int minor; + static PGVersion pgversion; + static POOL_RELCACHE *relcache; + char *result; + char *p; + char buf[VERSION_BUF_SIZE]; + int i; + int major; + int minor; /* * First, check local cache. If cache is set, just return it. @@ -743,7 +748,7 @@ Pgversion(POOL_CONNECTION_POOL * backend) /* * Search relcache. */ - result = (char *)pool_search_relcache(relcache, backend, "version"); + result = (char *) pool_search_relcache(relcache, backend, "version"); if (result == 0) { ereport(FATAL, @@ -801,7 +806,7 @@ Pgversion(POOL_CONNECTION_POOL * backend) { p++; i = 0; - while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') + while (i < VERSION_BUF_SIZE - 1 && p && *p != '.' && *p != ' ') { buf[i++] = *p++; } @@ -817,7 +822,7 @@ Pgversion(POOL_CONNECTION_POOL * backend) */ p++; i = 0; - while (i < VERSION_BUF_SIZE -1 && p && *p != '.' && *p != ' ') + while (i < VERSION_BUF_SIZE - 1 && p && *p != '.' && *p != ' ') { buf[i++] = *p++; } @@ -879,6 +884,7 @@ choose_db_node_id(char *str) } return node_id; } + /* *--------------------------------------------------------------------------------- * Snapshot Isolation modules @@ -1003,7 +1009,7 @@ void si_snapshot_acquired(void) { POOL_SESSION_CONTEXT *session; - int i; + int i; session = pool_get_session_context(true); @@ -1018,9 +1024,10 @@ si_snapshot_acquired(void) if (si_manage_info->snapshot_counter == 0) { /* wakeup all waiting children */ - for (i = 0; i < pool_config->num_init_children ; i++) + for (i = 0; i < pool_config->num_init_children; i++) { - pid_t pid = si_manage_info->snapshot_waiting_children[i]; + pid_t pid = si_manage_info->snapshot_waiting_children[i]; + if (pid > 0) { elog(SI_DEBUG_LOG_LEVEL, "si_snapshot_acquired: send SIGUSR2 to %d", pid); @@ -1076,7 +1083,7 @@ void si_commit_done(void) { POOL_SESSION_CONTEXT *session; - int i; + int i; session = pool_get_session_context(true); @@ -1092,9 +1099,10 @@ si_commit_done(void) if (si_manage_info->commit_counter == 0) { /* wakeup all waiting children */ - for (i = 0; i < pool_config->num_init_children ; i++) + for (i = 0; i < pool_config->num_init_children; i++) { - pid_t pid = si_manage_info->commit_waiting_children[i]; + pid_t pid = si_manage_info->commit_waiting_children[i]; + if (pid > 0) { elog(SI_DEBUG_LOG_LEVEL, "si_commit_done: send SIGUSR2 to %d", pid); @@ -1117,7 +1125,8 @@ si_commit_done(void) * -1: delay exceeds delay_threshold_by_time * -2: delay exceeds delay_threshold */ -int check_replication_delay(int node_id) +int +check_replication_delay(int node_id) { BackendInfo *bkinfo; @@ -1132,7 +1141,7 @@ int check_replication_delay(int node_id) * to multiply delay_threshold_by_time by 1000 to normalize. */ if (pool_config->delay_threshold_by_time > 0 && - bkinfo->standby_delay > pool_config->delay_threshold_by_time*1000) + bkinfo->standby_delay > pool_config->delay_threshold_by_time * 1000) return -1; /* @@ -1144,4 +1153,3 @@ int check_replication_delay(int node_id) return 0; } - diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c index 5a6b97ba1..b69cb3d52 100644 --- a/src/protocol/pool_process_query.c +++ b/src/protocol/pool_process_query.c @@ -80,28 +80,28 @@ #define IDLE_IN_TRANSACTION_SESSION_TIMEOUT_ERROR_CODE "25P03" #define IDLE_SESSION_TIMEOUT_ERROR_CODE "57P05" -static int reset_backend(POOL_CONNECTION_POOL * backend, int qcnt); +static int reset_backend(POOL_CONNECTION_POOL *backend, int qcnt); static char *get_insert_command_table_name(InsertStmt *node); -static bool is_cache_empty(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +static bool is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static bool is_panic_or_fatal_error(char *message, int major); -static int extract_message(POOL_CONNECTION * backend, char *error_code, int major, char class, bool unread); -static int detect_postmaster_down_error(POOL_CONNECTION * backend, int major); +static int extract_message(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread); +static int detect_postmaster_down_error(POOL_CONNECTION *backend, int major); static bool is_internal_transaction_needed(Node *node); static bool pool_has_insert_lock(void); -static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table); -static bool has_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table, bool for_update); -static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table); -static POOL_STATUS read_packets_and_process(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int reset_request, int *state, short *num_fields, bool *cont); +static POOL_STATUS add_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table); +static bool has_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table, bool for_update); +static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table); +static POOL_STATUS read_packets_and_process(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont); static bool is_all_standbys_command_complete(unsigned char *kind_list, int num_backends, int main_node); -static bool pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int backend_idx, char kind); +static bool pool_process_notice_message_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int backend_idx, char kind); /* * Main module for query processing * reset_request: if non 0, call reset_backend to execute reset queries */ POOL_STATUS -pool_process_query(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +pool_process_query(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, int reset_request) { short num_fields = 0; /* the number of fields in a row (V2 protocol) */ @@ -135,8 +135,8 @@ pool_process_query(POOL_CONNECTION * frontend, } /* - * Reset error flag while processing reset queries. - * The flag is set to on inside pool_send_and_wait(). + * Reset error flag while processing reset queries. The flag is set to on + * inside pool_send_and_wait(). */ reset_query_error = false; @@ -306,8 +306,7 @@ pool_process_query(POOL_CONNECTION * frontend, else { /* - * If we have pending data in main, we need to process - * it + * If we have pending data in main, we need to process it */ if (pool_ssl_pending(MAIN(backend)) || !pool_read_buffer_is_empty(MAIN(backend))) @@ -327,8 +326,8 @@ pool_process_query(POOL_CONNECTION * frontend, !pool_read_buffer_is_empty(CONNECTION(backend, i))) { /* - * If we have pending data in main, we need - * to process it + * If we have pending data in main, we need to + * process it */ if (IS_MAIN_NODE_ID(i)) { @@ -344,9 +343,8 @@ pool_process_query(POOL_CONNECTION * frontend, char *string; /* - * If main does not have pending data, - * we discard one packet from other - * backend + * If main does not have pending data, we + * discard one packet from other backend */ pool_read_with_error(CONNECTION(backend, i), &kind, sizeof(kind), "reading message kind from backend"); @@ -358,22 +356,22 @@ pool_process_query(POOL_CONNECTION * frontend, int sendlen; /* - * In native replication mode we may - * send the query to the standby - * node and the NOTIFY comes back - * only from primary node. But - * since we have sent the query to - * the standby, so the current - * MAIN_NODE_ID will be pointing - * to the standby node. And we - * will get stuck if we keep - * waiting for the current main - * node (standby) in this case to - * send us the NOTIFY message. see - * "0000116: LISTEN Notifications - * Not Reliably Delivered Using - * JDBC4 Demonstrator" for the - * scenario + * In native replication mode we + * may send the query to the + * standby node and the NOTIFY + * comes back only from primary + * node. But since we have sent + * the query to the standby, so + * the current MAIN_NODE_ID will + * be pointing to the standby + * node. And we will get stuck if + * we keep waiting for the current + * main node (standby) in this + * case to send us the NOTIFY + * message. see "0000116: LISTEN + * Notifications Not Reliably + * Delivered Using JDBC4 + * Demonstrator" for the scenario */ pool_read_with_error(CONNECTION(backend, i), &len, sizeof(len), "reading message length from backend"); @@ -396,11 +394,11 @@ pool_process_query(POOL_CONNECTION * frontend, * sent to all backends. However * the order of arrival of * 'Notification response' is not - * necessarily the main first - * and then standbys. So if it - * arrives standby first, we should - * try to read from main, rather - * than just discard it. + * necessarily the main first and + * then standbys. So if it arrives + * standby first, we should try to + * read from main, rather than + * just discard it. */ pool_unread(CONNECTION(backend, i), &kind, sizeof(kind)); ereport(LOG, @@ -473,6 +471,7 @@ pool_process_query(POOL_CONNECTION * frontend, if (pool_config->memory_cache_enabled) { volatile bool invalidate_request = Req_info->query_cache_invalidate_request; + if (invalidate_request) { /* @@ -489,7 +488,7 @@ pool_process_query(POOL_CONNECTION * frontend, * send simple query message to a node. */ void -send_simplequery_message(POOL_CONNECTION * backend, int len, char *string, int major) +send_simplequery_message(POOL_CONNECTION *backend, int len, char *string, int major) { /* forward the query to the backend */ pool_write(backend, "Q", 1); @@ -512,7 +511,7 @@ send_simplequery_message(POOL_CONNECTION * backend, int len, char *string, int m */ void -wait_for_query_response_with_trans_cleanup(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, +wait_for_query_response_with_trans_cleanup(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, int protoVersion, int pid, char *key, int keylen) { PG_TRY(); @@ -545,7 +544,7 @@ wait_for_query_response_with_trans_cleanup(POOL_CONNECTION * frontend, POOL_CONN * response. */ POOL_STATUS -wait_for_query_response(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, int protoVersion) +wait_for_query_response(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, int protoVersion) { #define DUMMY_PARAMETER "pgpool_dummy_param" #define DUMMY_VALUE "pgpool_dummy_value" @@ -632,7 +631,7 @@ wait_for_query_response(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, i * Extended query protocol has to send Flush message. */ POOL_STATUS -send_extended_protocol_message(POOL_CONNECTION_POOL * backend, +send_extended_protocol_message(POOL_CONNECTION_POOL *backend, int node_id, char *kind, int len, char *string) { @@ -665,7 +664,7 @@ send_extended_protocol_message(POOL_CONNECTION_POOL * backend, * wait until read data is ready */ int -synchronize(POOL_CONNECTION * cp) +synchronize(POOL_CONNECTION *cp) { return pool_check_fd(cp); } @@ -678,7 +677,7 @@ synchronize(POOL_CONNECTION * cp) * valid backends might be changed by failover/failback. */ void -pool_send_frontend_exits(POOL_CONNECTION_POOL * backend) +pool_send_frontend_exits(POOL_CONNECTION_POOL *backend) { int len; int i; @@ -720,8 +719,8 @@ pool_send_frontend_exits(POOL_CONNECTION_POOL * backend) */ POOL_STATUS -SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +SimpleForwardToFrontend(char kind, POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { int len, len1 = 0; @@ -786,13 +785,13 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend, /* * Optimization for other than "Command Complete", "Ready For query", - * "Error response" ,"Notice message", "Notification response", - * "Row description", "No data" and "Close Complete". - * messages. Especially, since it is too often to receive and forward - * "Data Row" message, we do not flush the message to frontend now. We - * expect that "Command Complete" message (or "Error response" or "Notice - * response" message) follows the stream of data row message anyway, so - * flushing will be done at that time. + * "Error response" ,"Notice message", "Notification response", "Row + * description", "No data" and "Close Complete". messages. Especially, + * since it is too often to receive and forward "Data Row" message, we do + * not flush the message to frontend now. We expect that "Command + * Complete" message (or "Error response" or "Notice response" message) + * follows the stream of data row message anyway, so flushing will be done + * at that time. * * Same thing can be said to CopyData message. Tremendous number of * CopyData messages are sent to frontend (typical use case is pg_dump). @@ -851,8 +850,8 @@ SimpleForwardToFrontend(char kind, POOL_CONNECTION * frontend, } POOL_STATUS -SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, int len, char *contents) { int sendlen; @@ -912,7 +911,7 @@ SimpleForwardToBackend(char kind, POOL_CONNECTION * frontend, * Handle parameter status message */ POOL_STATUS -ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +ParameterStatus(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int len, len1 = 0; @@ -922,7 +921,8 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) char *name; char *value; POOL_STATUS status; - char *parambuf = NULL; /* pointer to parameter + value string buffer */ + char *parambuf = NULL; /* pointer to parameter + value string + * buffer */ int i; pool_write(frontend, "S", 1); @@ -959,15 +959,16 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) if (IS_MAIN_NODE_ID(i)) { - int pos; + int pos; len1 = len; + /* * To suppress Coverity false positive warning. Actually * being IS_MAIN_NODE_ID(i)) true only happens in a loop. So * we don't need to worry about to leak memory previously - * allocated in parambuf. But Coverity is not smart enough - * to realize it. + * allocated in parambuf. But Coverity is not smart enough to + * realize it. */ if (parambuf) pfree(parambuf); @@ -984,7 +985,8 @@ ParameterStatus(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) else { /* - * Except "in_hot_standby" parameter, complain the message length difference. + * Except "in_hot_standby" parameter, complain the message + * length difference. */ if (strcmp(name, "in_hot_standby")) { @@ -1037,7 +1039,7 @@ reset_connection(void) * 0: no query was issued 1: a query was issued 2: no more queries remain -1: error */ static int -reset_backend(POOL_CONNECTION_POOL * backend, int qcnt) +reset_backend(POOL_CONNECTION_POOL *backend, int qcnt) { char *query; int qn; @@ -1129,7 +1131,7 @@ reset_backend(POOL_CONNECTION_POOL * backend, int qcnt) bool is_select_query(Node *node, char *sql) { - bool prepare = false; + bool prepare = false; if (node == NULL) return false; @@ -1154,6 +1156,7 @@ is_select_query(Node *node, char *sql) if (IsA(node, PrepareStmt)) { PrepareStmt *prepare_statement = (PrepareStmt *) node; + prepare = true; node = (Node *) (prepare_statement->query); } @@ -1183,15 +1186,15 @@ is_select_query(Node *node, char *sql) } /* - * If SQL comment is not allowed, the query must start with certain characters. - * However if it's PREPARE, we should skip the check. + * If SQL comment is not allowed, the query must start with certain + * characters. However if it's PREPARE, we should skip the check. */ if (!pool_config->allow_sql_comments) /* '\0' and ';' signify empty query */ return (*sql == 's' || *sql == 'S' || *sql == '(' || *sql == 'w' || *sql == 'W' || *sql == 't' || *sql == 'T' || *sql == '\0' || *sql == ';' || - prepare); + prepare); else return true; } @@ -1325,7 +1328,7 @@ is_rollback_to_query(Node *node) * send error message to frontend */ void -pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor, +pool_send_error_message(POOL_CONNECTION *frontend, int protoMajor, char *code, char *message, char *detail, @@ -1340,7 +1343,7 @@ pool_send_error_message(POOL_CONNECTION * frontend, int protoMajor, * send fatal message to frontend */ void -pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor, +pool_send_fatal_message(POOL_CONNECTION *frontend, int protoMajor, char *code, char *message, char *detail, @@ -1355,7 +1358,7 @@ pool_send_fatal_message(POOL_CONNECTION * frontend, int protoMajor, * send severity message to frontend */ void -pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor, +pool_send_severity_message(POOL_CONNECTION *frontend, int protoMajor, char *code, char *message, char *detail, @@ -1460,7 +1463,7 @@ pool_send_severity_message(POOL_CONNECTION * frontend, int protoMajor, } void -pool_send_readyforquery(POOL_CONNECTION * frontend) +pool_send_readyforquery(POOL_CONNECTION *frontend) { int len; @@ -1481,7 +1484,7 @@ pool_send_readyforquery(POOL_CONNECTION * frontend) * length for ReadyForQuery. This mode is necessary when called from ReadyForQuery(). */ POOL_STATUS -do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, +do_command(POOL_CONNECTION *frontend, POOL_CONNECTION *backend, char *query, int protoMajor, int pid, char *key, int keylen, int no_ready_for_query) { int len; @@ -1496,8 +1499,8 @@ do_command(POOL_CONNECTION * frontend, POOL_CONNECTION * backend, send_simplequery_message(backend, strlen(query) + 1, query, protoMajor); /* - * Wait for response from backend while polling frontend connection is - * ok. If not, cancel the transaction. + * Wait for response from backend while polling frontend connection is ok. + * If not, cancel the transaction. */ wait_for_query_response_with_trans_cleanup(frontend, backend, @@ -1696,7 +1699,7 @@ retry_read_packet: * If SELECT is error, we must abort transaction on other nodes. */ void -do_error_command(POOL_CONNECTION * backend, int major) +do_error_command(POOL_CONNECTION *backend, int major) { char *error_query = POOL_ERROR_QUERY; int len; @@ -1753,7 +1756,7 @@ do_error_command(POOL_CONNECTION * backend, int major) * than main node to ket them go into abort status. */ void -do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major) +do_error_execute_command(POOL_CONNECTION_POOL *backend, int node_id, int major) { char kind; char *string; @@ -1852,7 +1855,7 @@ do_error_execute_command(POOL_CONNECTION_POOL * backend, int node_id, int major) * Free POOL_SELECT_RESULT object */ void -free_select_result(POOL_SELECT_RESULT * result) +free_select_result(POOL_SELECT_RESULT *result) { int i, j; @@ -1905,7 +1908,7 @@ free_select_result(POOL_SELECT_RESULT * result) * to void. and now ereport is thrown in case of error occurred within the function */ void -do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, int major) +do_query(POOL_CONNECTION *backend, char *query, POOL_SELECT_RESULT **result, int major) { #define DO_QUERY_ALLOC_NUM 1024 /* memory allocation unit for * POOL_SELECT_RESULT */ @@ -1970,10 +1973,9 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i /* * Send a query to the backend. We use extended query protocol with named - * statement/portal if we are processing extended query since simple - * query breaks unnamed statements/portals. The name of named - * statement/unnamed statement are "pgpool_PID" where PID is the process id - * of itself. + * statement/portal if we are processing extended query since simple query + * breaks unnamed statements/portals. The name of named statement/unnamed + * statement are "pgpool_PID" where PID is the process id of itself. */ if (pool_get_session_context(true) && pool_is_doing_extended_query_message()) { @@ -2140,7 +2142,8 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i if (pool_extract_error_message(false, backend, major, true, &message) == 1) { - int etype; + int etype; + /* * This is fatal. Because: If we operate extended query, * backend would not accept subsequent commands until "sync" @@ -2150,9 +2153,8 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i * transaction is aborted, and subsequent query would not * accepted. In summary there's no transparent way for * frontend to handle error case. The only way is closing this - * session. - * However if the process type is main process, we should not - * exit the process. + * session. However if the process type is main process, we + * should not exit the process. */ if (processType == PT_WORKER) { @@ -2499,7 +2501,7 @@ do_query(POOL_CONNECTION * backend, char *query, POOL_SELECT_RESULT * *result, i * 3: row lock against insert_lock table is required */ int -need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node) +need_insert_lock(POOL_CONNECTION_POOL *backend, char *query, Node *node) { /* * Query to know if the target table has SERIAL column or not. @@ -2519,7 +2521,7 @@ need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node) char *table; int result; - static POOL_RELCACHE * relcache; + static POOL_RELCACHE *relcache; /* INSERT statement? */ if (!IsA(node, InsertStmt)) @@ -2619,7 +2621,7 @@ need_insert_lock(POOL_CONNECTION_POOL * backend, char *query, Node *node) * [ADMIN] 'SGT DETAIL: Could not open file "pg_clog/05DC": ... */ POOL_STATUS -insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *query, InsertStmt *node, int lock_kind) +insert_lock(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *query, InsertStmt *node, int lock_kind) { char *table; int len = 0; @@ -2631,7 +2633,7 @@ insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *qu regex_t preg; size_t nmatch = 2; regmatch_t pmatch[nmatch]; - static POOL_RELCACHE * relcache; + static POOL_RELCACHE *relcache; POOL_SELECT_RESULT *result; POOL_STATUS status = POOL_CONTINUE; @@ -2827,7 +2829,7 @@ insert_lock(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *qu else { status = do_command(frontend, MAIN(backend), qbuf, MAJOR(backend), MAIN_CONNECTION(backend)->pid, - MAIN_CONNECTION(backend)->key, MAIN_CONNECTION(backend)->keylen ,0); + MAIN_CONNECTION(backend)->key, MAIN_CONNECTION(backend)->keylen, 0); } } } @@ -2895,7 +2897,7 @@ pool_has_insert_lock(void) */ bool result; - static POOL_RELCACHE * relcache; + static POOL_RELCACHE *relcache; POOL_CONNECTION_POOL *backend; backend = pool_get_session_context(false)->backend; @@ -2923,7 +2925,8 @@ pool_has_insert_lock(void) * Return POOL_CONTINUE if the row is inserted successfully * or the row already exists, the others return POOL_ERROR. */ -static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *table) +static POOL_STATUS +add_lock_target(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *table) { /* * lock the row where reloid is 0 to avoid "duplicate key violates..." @@ -2984,8 +2987,8 @@ static POOL_STATUS add_lock_target(POOL_CONNECTION * frontend, POOL_CONNECTION_P * If lock is true, this function locks the row of the table oid. */ static bool -has_lock_target(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +has_lock_target(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, char *table, bool lock) { char *suffix; @@ -3025,9 +3028,10 @@ has_lock_target(POOL_CONNECTION * frontend, /* * Insert the oid of the specified table into insert_lock table. */ -static POOL_STATUS insert_oid_into_insert_lock(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - char *table) +static POOL_STATUS +insert_oid_into_insert_lock(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + char *table) { char qbuf[QUERY_STRING_BUFFER_LEN]; POOL_STATUS status; @@ -3093,7 +3097,7 @@ is_drop_database(Node *node) * check if any pending data remains in backend. */ bool -is_backend_cache_empty(POOL_CONNECTION_POOL * backend) +is_backend_cache_empty(POOL_CONNECTION_POOL *backend) { int i; @@ -3120,14 +3124,14 @@ is_backend_cache_empty(POOL_CONNECTION_POOL * backend) * check if any pending data remains. */ static bool -is_cache_empty(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +is_cache_empty(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { /* Are we suspending reading from frontend? */ if (!pool_is_suspend_reading_from_frontend()) { /* - * If SSL is enabled, we need to check SSL internal buffer is empty or not - * first. + * If SSL is enabled, we need to check SSL internal buffer is empty or + * not first. */ if (pool_ssl_pending(frontend)) return false; @@ -3228,7 +3232,7 @@ check_copy_from_stdin(Node *node) * read kind from one backend */ void -read_kind_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *kind, int node) +read_kind_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *kind, int node) { if (VALID_BACKEND(node)) { @@ -3275,7 +3279,7 @@ is_all_standbys_command_complete(unsigned char *kind_list, int num_backends, int * this function uses "decide by majority" method if kinds from all backends do not agree. */ void -read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, char *decided_kind) +read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *decided_kind) { int i; unsigned char kind_list[MAX_NUM_BACKENDS]; /* records each backend's kind */ @@ -3378,9 +3382,9 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen read_kind_from_one_backend(frontend, backend, (char *) &kind, MAIN_NODE_ID); /* - * If we received a notification message in native replication mode, other - * backends will not receive the message. So we should skip other - * nodes otherwise we will hang in pool_read. + * If we received a notification message in native replication mode, + * other backends will not receive the message. So we should skip + * other nodes otherwise we will hang in pool_read. */ if (kind == 'A') { @@ -3454,7 +3458,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen */ else if (kind == 'S') { - int len2; + int len2; pool_read(CONNECTION(backend, i), &len, sizeof(len)); len2 = len; @@ -3469,7 +3473,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen if (IS_MAIN_NODE_ID(i)) { - int pos; + int pos; + pool_add_param(&CONNECTION(backend, i)->params, p, value); if (!strcmp("application_name", p)) @@ -3525,16 +3530,16 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen */ for (i = 0; i < NUM_BACKENDS; i++) { - int unread_len; - char *unread_p; - char *p; - int len; + int unread_len; + char *unread_p; + char *p; + int len; if (VALID_BACKEND(i)) { if (kind_list[i] == 'E') { - int major = MAJOR(CONNECTION(backend, i)); + int major = MAJOR(CONNECTION(backend, i)); if (major == PROTO_MAJOR_V3) { @@ -3618,8 +3623,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen * cases it is possible that similar issue could happen since returned * messages do not follow the sequence recorded in the pending * messages because the backend ignores requests till sync message is - * received. In this case we need to re-sync either primary or standby. - * So we check not only the standby but primary node. + * received. In this case we need to re-sync either primary or + * standby. So we check not only the standby but primary node. */ if (session_context->load_balance_node_id != MAIN_NODE_ID && (kind_list[MAIN_NODE_ID] == 'Z' || @@ -3697,8 +3702,8 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen /* * In main/replica mode, if primary gets an error at commit, while - * other standbys are normal at commit, we don't need to degenerate any - * backend because it is likely that the error was caused by a + * other standbys are normal at commit, we don't need to degenerate + * any backend because it is likely that the error was caused by a * deferred trigger. */ else if (MAIN_REPLICA && query_context->parse_tree && @@ -3712,7 +3717,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen errdetail("do not degenerate because it is likely caused by a delayed commit"))); if (SL_MODE && pool_is_doing_extended_query_message() && msg) - pool_pending_message_free_pending_message(msg); + pool_pending_message_free_pending_message(msg); return; } else if (max_count <= NUM_BACKENDS / 2.0) @@ -3755,7 +3760,7 @@ read_kind_from_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backen if (degenerate_node_num) { int retcode = 2; - StringInfoData msg; + StringInfoData msg; initStringInfo(&msg); appendStringInfoString(&msg, "kind mismatch among backends. "); @@ -3948,7 +3953,7 @@ parse_copy_data(char *buf, int len, char delimiter, int col_id) } void -query_ps_status(char *query, POOL_CONNECTION_POOL * backend) +query_ps_status(char *query, POOL_CONNECTION_POOL *backend) { StartupPacket *sp; char psbuf[1024]; @@ -4128,7 +4133,7 @@ is_internal_transaction_needed(Node *node) * Start an internal transaction if necessary. */ POOL_STATUS -start_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node) +start_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node) { int i; @@ -4172,7 +4177,7 @@ start_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * ba * that satisfy VALID_BACKEND macro. */ POOL_STATUS -end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +end_internal_transaction(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { int i; int len; @@ -4253,10 +4258,9 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back if (MAJOR(backend) == PROTO_MAJOR_V3 && VALID_BACKEND(MAIN_NODE_ID)) { /* - * Skip rest of Ready for Query packet for backends - * satisfying VALID_BACKEND macro because they should have - * been already received the data, which is not good for - * do_command(). + * Skip rest of Ready for Query packet for backends satisfying + * VALID_BACKEND macro because they should have been already + * received the data, which is not good for do_command(). */ pool_read(CONNECTION(backend, MAIN_NODE_ID), &len, sizeof(len)); pool_read(CONNECTION(backend, MAIN_NODE_ID), &tstate, sizeof(tstate)); @@ -4291,8 +4295,8 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back if (MAJOR(backend) == PROTO_MAJOR_V3 && !VALID_BACKEND(MAIN_NODE_ID)) { /* - * Skip rest of Ready for Query packet for the backend - * that does not satisfy VALID_BACKEND. + * Skip rest of Ready for Query packet for the backend that + * does not satisfy VALID_BACKEND. */ pool_read(CONNECTION(backend, MAIN_NODE_ID), &len, sizeof(len)); pool_read(CONNECTION(backend, MAIN_NODE_ID), &tstate, sizeof(tstate)); @@ -4317,7 +4321,7 @@ end_internal_transaction(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * back static bool is_panic_or_fatal_error(char *message, int major) { - char *str; + char *str; str = extract_error_kind(message, major); @@ -4335,7 +4339,7 @@ is_panic_or_fatal_error(char *message, int major) char * extract_error_kind(char *message, int major) { - char *ret = "unknown"; + char *ret = "unknown"; if (major == PROTO_MAJOR_V3) { @@ -4370,7 +4374,7 @@ extract_error_kind(char *message, int major) } static int -detect_postmaster_down_error(POOL_CONNECTION * backend, int major) +detect_postmaster_down_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, ADMIN_SHUTDOWN_ERROR_CODE, major, 'E', false); @@ -4410,7 +4414,7 @@ detect_postmaster_down_error(POOL_CONNECTION * backend, int major) } int -detect_active_sql_transaction_error(POOL_CONNECTION * backend, int major) +detect_active_sql_transaction_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, ACTIVE_SQL_TRANSACTION_ERROR_CODE, major, 'E', true); @@ -4424,7 +4428,7 @@ detect_active_sql_transaction_error(POOL_CONNECTION * backend, int major) } int -detect_deadlock_error(POOL_CONNECTION * backend, int major) +detect_deadlock_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, DEADLOCK_ERROR_CODE, major, 'E', true); @@ -4436,7 +4440,7 @@ detect_deadlock_error(POOL_CONNECTION * backend, int major) } int -detect_serialization_error(POOL_CONNECTION * backend, int major, bool unread) +detect_serialization_error(POOL_CONNECTION *backend, int major, bool unread) { int r = extract_message(backend, SERIALIZATION_FAIL_ERROR_CODE, major, 'E', unread); @@ -4448,7 +4452,7 @@ detect_serialization_error(POOL_CONNECTION * backend, int major, bool unread) } int -detect_query_cancel_error(POOL_CONNECTION * backend, int major) +detect_query_cancel_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, QUERY_CANCEL_ERROR_CODE, major, 'E', true); @@ -4461,7 +4465,7 @@ detect_query_cancel_error(POOL_CONNECTION * backend, int major) int -detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION * backend, int major) +detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, IDLE_IN_TRANSACTION_SESSION_TIMEOUT_ERROR_CODE, major, 'E', true); @@ -4473,7 +4477,7 @@ detect_idle_in_transaction_session_timeout_error(POOL_CONNECTION * backend, int } int -detect_idle_session_timeout_error(POOL_CONNECTION * backend, int major) +detect_idle_session_timeout_error(POOL_CONNECTION *backend, int major) { int r = extract_message(backend, IDLE_SESSION_TIMEOUT_ERROR_CODE, major, 'E', true); @@ -4490,13 +4494,13 @@ detect_idle_session_timeout_error(POOL_CONNECTION * backend, int major) * throw an ereport for all other errors. */ static int -extract_message(POOL_CONNECTION * backend, char *error_code, int major, char class, bool unread) +extract_message(POOL_CONNECTION *backend, char *error_code, int major, char class, bool unread) { int is_error = 0; char kind; int len; int nlen = 0; - char *str = NULL; + char *str = NULL; if (pool_read(backend, &kind, sizeof(kind))) return -1; @@ -4567,7 +4571,7 @@ extract_message(POOL_CONNECTION * backend, char *error_code, int major, char cla */ static bool -pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int backend_idx, char kind) +pool_process_notice_message_from_one_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int backend_idx, char kind) { int major = MAJOR(backend); POOL_CONNECTION *backend_conn = CONNECTION(backend, backend_idx); @@ -4659,7 +4663,7 @@ pool_process_notice_message_from_one_backend(POOL_CONNECTION * frontend, POOL_CO * -1: error */ int -pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, int major, bool unread, char **message) +pool_extract_error_message(bool read_kind, POOL_CONNECTION *backend, int major, bool unread, char **message) { char kind; int ret = 1; @@ -4764,7 +4768,7 @@ pool_extract_error_message(bool read_kind, POOL_CONNECTION * backend, int major, * read message kind and rest of the packet then discard it */ POOL_STATUS -pool_discard_packet(POOL_CONNECTION_POOL * cp) +pool_discard_packet(POOL_CONNECTION_POOL *cp) { int i; char kind; @@ -4792,7 +4796,7 @@ pool_discard_packet(POOL_CONNECTION_POOL * cp) * read message length and rest of the packet then discard it */ POOL_STATUS -pool_discard_packet_contents(POOL_CONNECTION_POOL * cp) +pool_discard_packet_contents(POOL_CONNECTION_POOL *cp) { int len, i; @@ -4834,7 +4838,8 @@ pool_discard_packet_contents(POOL_CONNECTION_POOL * cp) /* * Read packet from either frontend or backend and process it. */ -static POOL_STATUS read_packets_and_process(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, int reset_request, int *state, short *num_fields, bool *cont) +static POOL_STATUS +read_packets_and_process(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont) { fd_set readmask; fd_set writemask; @@ -4982,7 +4987,7 @@ SELECT_RETRY: ereport(LOG, (errmsg("error occurred while reading and processing packets"), errdetail("FATAL ERROR: VALID_BACKEND returns non 0 but connection slot is empty. backend id:%d RAW_MODE:%d LOAD_BALANCE_STATUS:%d status:%d", - i, RAW_MODE, LOAD_BALANCE_STATUS(i), BACKEND_INFO(i).backend_status))); + i, RAW_MODE, LOAD_BALANCE_STATUS (i), BACKEND_INFO(i).backend_status))); was_error = 1; break; } @@ -5006,7 +5011,8 @@ SELECT_RETRY: } /* - * connection was terminated due to idle_in_transaction_session_timeout expired + * connection was terminated due to + * idle_in_transaction_session_timeout expired */ r = detect_idle_in_transaction_session_timeout_error(CONNECTION(backend, i), MAJOR(backend)); if (r == SPECIFIED_ERROR) @@ -5017,7 +5023,8 @@ SELECT_RETRY: } /* - * connection was terminated due to idle_session_timeout expired + * connection was terminated due to idle_session_timeout + * expired */ r = detect_idle_session_timeout_error(CONNECTION(backend, i), MAJOR(backend)); if (r == SPECIFIED_ERROR) @@ -5167,7 +5174,7 @@ pool_dump_valid_backend(int backend_id) * Returns true if data was actually pushed. */ bool -pool_push_pending_data(POOL_CONNECTION * backend) +pool_push_pending_data(POOL_CONNECTION *backend) { POOL_SESSION_CONTEXT *session_context; int len; @@ -5175,8 +5182,8 @@ pool_push_pending_data(POOL_CONNECTION * backend) bool pending_data_existed = false; static char random_statement[] = "pgpool_non_existent"; - int num_pending_messages; - int num_pushed_messages; + int num_pending_messages; + int num_pushed_messages; if (!pool_get_session_context(true) || !pool_is_doing_extended_query_message()) return false; @@ -5194,7 +5201,8 @@ pool_push_pending_data(POOL_CONNECTION * backend) * In streaming replication mode, send a Close message for none existing * prepared statement and flush message before going any further to * retrieve and save any pending response packet from backend. This - * ensures that at least "close complete" message is returned from backend. + * ensures that at least "close complete" message is returned from + * backend. * * The saved packets will be popped up before returning to caller. This * preserves the user's expectation of packet sequence. @@ -5254,7 +5262,7 @@ pool_push_pending_data(POOL_CONNECTION * backend) len = ntohl(len); len -= sizeof(len); buf = NULL; - if (len > 0) + if (len > 0) { buf = palloc(len); pool_read(backend, buf, len); diff --git a/src/protocol/pool_proto2.c b/src/protocol/pool_proto2.c index ffca996c3..bd09a4494 100644 --- a/src/protocol/pool_proto2.c +++ b/src/protocol/pool_proto2.c @@ -34,8 +34,8 @@ #include "utils/elog.h" POOL_STATUS -AsciiRow(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +AsciiRow(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, short num_fields) { static char nullmap[8192], @@ -161,8 +161,8 @@ AsciiRow(POOL_CONNECTION * frontend, } POOL_STATUS -BinaryRow(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +BinaryRow(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, short num_fields) { static char nullmap[8192], @@ -273,8 +273,8 @@ BinaryRow(POOL_CONNECTION * frontend, } POOL_STATUS -CompletedResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +CompletedResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { int i; char *string = NULL; @@ -339,8 +339,8 @@ CompletedResponse(POOL_CONNECTION * frontend, } POOL_STATUS -CursorResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +CursorResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char *string = NULL; char *string1 = NULL; @@ -387,8 +387,8 @@ CursorResponse(POOL_CONNECTION * frontend, } void -EmptyQueryResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +EmptyQueryResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char c; int i; @@ -406,8 +406,8 @@ EmptyQueryResponse(POOL_CONNECTION * frontend, } POOL_STATUS -ErrorResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +ErrorResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char *string = ""; int len = 0; @@ -448,8 +448,8 @@ ErrorResponse(POOL_CONNECTION * frontend, } POOL_STATUS -FunctionResultResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +FunctionResultResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char dummy; int len; @@ -517,8 +517,8 @@ FunctionResultResponse(POOL_CONNECTION * frontend, } void -NoticeResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +NoticeResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char *string = NULL; int len = 0; @@ -550,8 +550,8 @@ NoticeResponse(POOL_CONNECTION * frontend, } POOL_STATUS -NotificationResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +NotificationResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { int pid, pid1; @@ -593,8 +593,8 @@ NotificationResponse(POOL_CONNECTION * frontend, } int -RowDescription(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +RowDescription(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, short *result) { short num_fields, diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index c2802998a..6fad3353c 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -86,27 +86,27 @@ int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR */ char query_string_buffer[QUERY_STRING_BUFFER_LEN]; -static int check_errors(POOL_CONNECTION_POOL * backend, int backend_id); +static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id); static void generate_error_message(char *prefix, int specific_error, char *query); -static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - POOL_SENT_MESSAGE * message, - POOL_SENT_MESSAGE * bind_message); -static POOL_STATUS send_prepare(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - POOL_SENT_MESSAGE * message); +static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE *message, + POOL_SENT_MESSAGE *bind_message); +static POOL_STATUS send_prepare(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE *message); static int *find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes); -static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend); +static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend); static char *flatten_set_variable_args(const char *name, List *args); static bool - process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context); -static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend); -static void si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node, bool tstate_check); + process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context); +static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend); +static void si_get_snapshot(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node, bool tstate_check); -static bool check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend); +static bool check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend); static bool multi_statement_query(char *buf); @@ -137,7 +137,7 @@ static POOL_QUERY_CONTEXT *create_dummy_query_context(void); * */ static bool -process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context) +process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context) { /* * locate pg_terminate_backend and get the pid argument, if @@ -184,8 +184,8 @@ process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context) * If frontend == NULL, we are called in case of reset queries. */ POOL_STATUS -SimpleQuery(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, int len, char *contents) +SimpleQuery(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, int len, char *contents) { static char *sq_config = "pool_status"; static char *sq_pools = "pool_pools"; @@ -249,7 +249,7 @@ SimpleQuery(POOL_CONNECTION * frontend, * Fetch memory cache if possible */ if (pool_config->memory_cache_enabled) - is_likely_select = pool_is_likely_select(contents); + is_likely_select = pool_is_likely_select(contents); /* * If memory query cache enabled and the query seems to be a SELECT use @@ -261,8 +261,8 @@ SimpleQuery(POOL_CONNECTION * frontend, * transaction, but it will need parsing query and accessing to system * catalog, which will add significant overhead. Moreover if we are in * aborted transaction, commands should be ignored, so we should not use + * query cache. Also query cache is disabled, we should not fetch from * query cache. - * Also query cache is disabled, we should not fetch from query cache. */ if (pool_config->memory_cache_enabled && is_likely_select && !pool_is_writing_transaction() && @@ -308,6 +308,7 @@ SimpleQuery(POOL_CONNECTION * frontend, else { query_context->is_multi_statement = false; + /* * Do not use minimal parser if we are in native replication or * snapshot isolation mode. @@ -355,8 +356,8 @@ SimpleQuery(POOL_CONNECTION * frontend, * were an DELETE command. Note that the DELETE command does not * execute, instead the original query will be sent to backends, * which may or may not cause an actual syntax errors. The command - * will be sent to all backends in replication mode or - * primary in native replication mode. + * will be sent to all backends in replication mode or primary in + * native replication mode. */ if (!strcmp(remote_host, "[local]")) { @@ -438,7 +439,7 @@ SimpleQuery(POOL_CONNECTION * frontend, (errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid))); } } - + /* * check COPY FROM STDIN if true, set copy_* variable */ @@ -669,13 +670,13 @@ SimpleQuery(POOL_CONNECTION * frontend, struct timeval stime; stime.tv_usec = 0; - stime.tv_sec = 5; /* XXX give arbitrary time to allow - * closing idle connections */ + stime.tv_sec = 5; /* XXX give arbitrary time to allow closing + * idle connections */ ereport(DEBUG1, (errmsg("Query: sending SIGUSR1 signal to parent"))); - ignore_sigusr1 = 1; /* disable SIGUSR1 handler */ + ignore_sigusr1 = 1; /* disable SIGUSR1 handler */ close_idle_connections(); /* @@ -684,7 +685,7 @@ SimpleQuery(POOL_CONNECTION * frontend, */ for (;;) { - int sts; + int sts; errno = 0; sts = select(0, NULL, NULL, NULL, &stime); @@ -824,8 +825,9 @@ SimpleQuery(POOL_CONNECTION * frontend, /* * If the query is BEGIN READ WRITE or BEGIN ... SERIALIZABLE * in streaming replication mode, we send BEGIN to standbys - * instead. The original_query which is BEGIN READ WRITE is sent - * to primary. The rewritten_query BEGIN is sent to standbys. + * instead. The original_query which is BEGIN READ WRITE is + * sent to primary. The rewritten_query BEGIN is sent to + * standbys. */ if (pool_need_to_treat_as_if_default_transaction(query_context)) { @@ -898,8 +900,8 @@ SimpleQuery(POOL_CONNECTION * frontend, } /* - * Send "COMMIT" or "ROLLBACK" to only main node if query is - * "COMMIT" or "ROLLBACK" + * Send "COMMIT" or "ROLLBACK" to only main node if query is "COMMIT" + * or "ROLLBACK" */ if (commit) { @@ -930,7 +932,7 @@ SimpleQuery(POOL_CONNECTION * frontend, * process EXECUTE (V3 only) */ POOL_STATUS -Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { int commit = 0; @@ -942,7 +944,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, POOL_SENT_MESSAGE *bind_msg; bool foundp = false; int num_rows; - char *p; + char *p; /* Get session context */ session_context = pool_get_session_context(false); @@ -1002,7 +1004,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, elog(DEBUG1, "set partial_fetch in execute"); } elog(DEBUG1, "execute: partial_fetch: %d", query_context->partial_fetch); - + strlcpy(query_string_buffer, query, sizeof(query_string_buffer)); ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query))); @@ -1017,8 +1019,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, /* * Fetch memory cache if possible. Also if atEnd is false or the execute - * message has 0 row argument, we maybe able to use cache. - * If partial_fetch is true, cannot use cache. + * message has 0 row argument, we maybe able to use cache. If + * partial_fetch is true, cannot use cache. */ if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() && (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E') @@ -1071,10 +1073,10 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, } /* - * If bind message is sent again to an existing prepared statement, - * it is possible that query_w_hex remains. Before setting newly - * allocated query_w_hex's pointer to the query context, free the - * previously allocated memory. + * If bind message is sent again to an existing prepared + * statement, it is possible that query_w_hex remains. Before + * setting newly allocated query_w_hex's pointer to the query + * context, free the previously allocated memory. */ if (query_context->query_w_hex) { @@ -1203,8 +1205,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, } /* - * send "COMMIT" or "ROLLBACK" to only main node if query is - * "COMMIT" or "ROLLBACK" + * send "COMMIT" or "ROLLBACK" to only main node if query is "COMMIT" + * or "ROLLBACK" */ if (commit) { @@ -1245,8 +1247,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, * Take care of "writing transaction" flag. */ if ((!is_select_query(node, query) || pool_has_function_call(node)) && - !is_start_transaction_query(node) && - !is_commit_or_rollback_query(node)) + !is_start_transaction_query(node) && + !is_commit_or_rollback_query(node)) { ereport(DEBUG1, (errmsg("Execute: TSTATE:%c", @@ -1280,7 +1282,7 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, * process Parse (V3 only) */ POOL_STATUS -Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { int deadlock_detected = 0; @@ -1316,7 +1318,7 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, /* parse SQL string */ MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context); - parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt),&error,!REPLICATION); + parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt), &error, !REPLICATION); if (parse_tree_list == NIL) { @@ -1338,8 +1340,8 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, * were an DELETE command. Note that the DELETE command does not * execute, instead the original query will be sent to backends, * which may or may not cause an actual syntax errors. The command - * will be sent to all backends in replication mode or - * primary in native replication mode. + * will be sent to all backends in replication mode or primary in + * native replication mode. */ if (!strcmp(remote_host, "[local]")) { @@ -1505,9 +1507,9 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, /* * If the query is BEGIN READ WRITE in main replica mode, we send - * BEGIN instead of it to standbys. original_query which is - * BEGIN READ WRITE is sent to primary. rewritten_query which is BEGIN - * is sent to standbys. + * BEGIN instead of it to standbys. original_query which is BEGIN READ + * WRITE is sent to primary. rewritten_query which is BEGIN is sent to + * standbys. */ if (is_start_transaction_query(query_context->parse_tree) && is_read_write((TransactionStmt *) query_context->parse_tree) && @@ -1518,7 +1520,8 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, } /* - * If not in streaming or logical replication mode, send "SYNC" message if not in a transaction. + * If not in streaming or logical replication mode, send "SYNC" message if + * not in a transaction. */ if (!SL_MODE) { @@ -1565,10 +1568,10 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, kind))); } else - ereport(ERROR, - (errmsg("unable to parse the query"), - errdetail("invalid read kind \"%c\" returned from backend after Sync message sent", - kind))); + ereport(ERROR, + (errmsg("unable to parse the query"), + errdetail("invalid read kind \"%c\" returned from backend after Sync message sent", + kind))); } /* @@ -1687,7 +1690,7 @@ Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, } POOL_STATUS -Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { char *pstmt_name; @@ -1786,7 +1789,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T') { pool_where_to_send(query_context, query_context->original_query, - query_context->parse_tree); + query_context->parse_tree); } /* @@ -1812,7 +1815,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, &oids); /* Save to oid buffer */ for (i = 0; i < num_oids; i++) - pool_add_dml_table_oid(oids[i]); + pool_add_dml_table_oid(oids[i]); } } @@ -1883,7 +1886,7 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, } POOL_STATUS -Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { POOL_SENT_MESSAGE *msg; @@ -1976,7 +1979,7 @@ Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, POOL_STATUS -Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { POOL_SENT_MESSAGE *msg; @@ -2012,10 +2015,9 @@ Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, errmsg("unable to execute close, invalid message"))); /* - * For PostgreSQL, calling close on non existing portals or - * statements is not an error. So on the same footings we will ignore all - * such calls and return the close complete message to clients with out - * going to backend + * For PostgreSQL, calling close on non existing portals or statements is + * not an error. So on the same footings we will ignore all such calls and + * return the close complete message to clients with out going to backend */ if (!msg) { @@ -2107,7 +2109,7 @@ Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, POOL_STATUS -FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, +FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int len, char *contents) { /* @@ -2138,8 +2140,8 @@ FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, * - internal transaction is closed */ POOL_STATUS -ReadyForQuery(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, bool send_ready, bool cache_commit) +ReadyForQuery(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, bool send_ready, bool cache_commit) { int i; int len; @@ -2189,7 +2191,7 @@ ReadyForQuery(POOL_CONNECTION * frontend, if (victim_nodes) { int i; - StringInfoData msg; + StringInfoData msg; initStringInfo(&msg); appendStringInfoString(&msg, "ReadyForQuery: Degenerate backends:"); @@ -2280,7 +2282,7 @@ ReadyForQuery(POOL_CONNECTION * frontend, /* if (pool_is_query_in_progress() && allow_close_transaction) */ if (REPLICATION && allow_close_transaction) { - bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID); + bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID); /* * If we are running in snapshot isolation mode and started an @@ -2332,10 +2334,11 @@ ReadyForQuery(POOL_CONNECTION * frontend, TSTATE(backend, i) = kind; ereport(DEBUG5, (errmsg("processing ReadyForQuery"), - errdetail("transaction state of node %d '%c'(%02x)", i, kind , kind))); + errdetail("transaction state of node %d '%c'(%02x)", i, kind, kind))); /* - * The transaction state to be returned to frontend is main node's. + * The transaction state to be returned to frontend is main + * node's. */ if (i == (MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID)) { @@ -2475,8 +2478,9 @@ ReadyForQuery(POOL_CONNECTION * frontend, /* * Close running transactions on standbys. */ -static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +static POOL_STATUS +close_standby_transactions(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { int i; @@ -2500,7 +2504,7 @@ static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend, } POOL_STATUS -ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { POOL_SESSION_CONTEXT *session_context; @@ -2524,7 +2528,7 @@ ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) } POOL_STATUS -BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { POOL_SESSION_CONTEXT *session_context; @@ -2548,7 +2552,7 @@ BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) } POOL_STATUS -CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { POOL_SESSION_CONTEXT *session_context; POOL_STATUS status; @@ -2622,8 +2626,8 @@ CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) } POOL_STATUS -ParameterDescription(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +ParameterDescription(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { int len, len1 = 0; @@ -2706,8 +2710,8 @@ ParameterDescription(POOL_CONNECTION * frontend, } POOL_STATUS -ErrorResponse3(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +ErrorResponse3(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { POOL_STATUS ret; @@ -2722,8 +2726,8 @@ ErrorResponse3(POOL_CONNECTION * frontend, } POOL_STATUS -FunctionCall(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +FunctionCall(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char dummy[2]; int oid; @@ -2818,8 +2822,8 @@ FunctionCall(POOL_CONNECTION * frontend, } POOL_STATUS -ProcessFrontendResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +ProcessFrontendResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { char fkind; char *bufp = NULL; @@ -3077,8 +3081,8 @@ ProcessFrontendResponse(POOL_CONNECTION * frontend, } POOL_STATUS -ProcessBackendResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, +ProcessBackendResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, int *state, short *num_fields) { int status = POOL_CONTINUE; @@ -3226,10 +3230,12 @@ ProcessBackendResponse(POOL_CONNECTION * frontend, case 'E': /* ErrorResponse */ if (pool_is_doing_extended_query_message()) { - char *message; + char *message; - /* Log the error message which was possibly missed till - * a sync message was sent */ + /* + * Log the error message which was possibly missed till a + * sync message was sent + */ if (pool_extract_error_message(false, MAIN(backend), PROTO_MAJOR_V3, true, &message) == 1) { @@ -3396,8 +3402,8 @@ ProcessBackendResponse(POOL_CONNECTION * frontend, } POOL_STATUS -CopyInResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +CopyInResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { POOL_STATUS status; @@ -3415,8 +3421,8 @@ CopyInResponse(POOL_CONNECTION * frontend, } POOL_STATUS -CopyOutResponse(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +CopyOutResponse(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { POOL_STATUS status; @@ -3434,8 +3440,8 @@ CopyOutResponse(POOL_CONNECTION * frontend, } POOL_STATUS -CopyDataRows(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, int copyin) +CopyDataRows(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, int copyin) { char *string = NULL; int len; @@ -3476,6 +3482,7 @@ CopyDataRows(POOL_CONNECTION * frontend, copy_count++; continue; } + /* * Flush (H) or Sync (S) messages should be ignored while in * the COPY IN mode. @@ -3607,7 +3614,7 @@ CopyDataRows(POOL_CONNECTION * frontend, * transaction state. */ void -raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend) +raise_intentional_error_if_need(POOL_CONNECTION_POOL *backend) { int i; POOL_SESSION_CONTEXT *session_context; @@ -3688,7 +3695,7 @@ raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend) *--------------------------------------------------- */ static int -check_errors(POOL_CONNECTION_POOL * backend, int backend_id) +check_errors(POOL_CONNECTION_POOL *backend, int backend_id) { /* @@ -3760,7 +3767,7 @@ generate_error_message(char *prefix, int specific_error, char *query) "received query cancel error message from main node. query: %s" }; - StringInfoData msg; + StringInfoData msg; session_context = pool_get_session_context(true); if (!session_context) @@ -3788,15 +3795,15 @@ generate_error_message(char *prefix, int specific_error, char *query) * Make per DB node statement log */ void -per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query) +per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query) { - ProcessInfo *pi = pool_get_my_process_info(); + ProcessInfo *pi = pool_get_my_process_info(); POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id]; if (pool_config->log_per_node_statement) ereport(LOG, (errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query))); - + pi_set(node_id); StrNCpy(pi->statement, query, MAXSTMTLEN); } @@ -3807,7 +3814,7 @@ per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query) void init_pi_set(void) { - ProcessInfo *pi = pool_get_my_process_info(); + ProcessInfo *pi = pool_get_my_process_info(); memset(pi->node_ids, 0, sizeof(pi->node_ids)); pi->statement[0] = '\0'; @@ -3819,7 +3826,7 @@ init_pi_set(void) void pi_set(int node_id) { - ProcessInfo *pi = pool_get_my_process_info(); + ProcessInfo *pi = pool_get_my_process_info(); if (node_id < BITS_PER_TYPE(uint64)) pi->node_ids[0] |= (1 << node_id); @@ -3833,7 +3840,7 @@ pi_set(int node_id) bool is_pi_set(uint64 *node_ids, int node_id) { - int set; + int set; if (node_id < BITS_PER_TYPE(uint64)) set = node_ids[0] & (1 << node_id); @@ -3846,7 +3853,7 @@ is_pi_set(uint64 *node_ids, int node_id) * Make per DB node statement notice message */ void -per_node_statement_notice(POOL_CONNECTION_POOL * backend, int node_id, char *query) +per_node_statement_notice(POOL_CONNECTION_POOL *backend, int node_id, char *query) { if (pool_config->notice_per_node_statement) ereport(NOTICE, @@ -3856,56 +3863,57 @@ per_node_statement_notice(POOL_CONNECTION_POOL * backend, int node_id, char *que /* * Make backend message log when log_backend_messages is on. */ -void log_backend_messages(unsigned char kind, int backend_id) +void +log_backend_messages(unsigned char kind, int backend_id) { /* * Map table for message kind and message label */ typedef struct { - unsigned char kind; /* message kind */ - char *label; /* message label */ - } BackendMessage; - + unsigned char kind; /* message kind */ + char *label; /* message label */ + } BackendMessage; + static BackendMessage message_label[] = - { - {'1', "ParseComplete"}, - {'2', "BindComplete"}, - {'3', "CloseComplete"}, - {'A', "NotificationResponse"}, - {'C', "CommandComplete"}, - {'D', "DataRow"}, - {'E', "ErrorResponse"}, - {'G', "CopyInResponse"}, - {'H', "CopyOutResponse"}, - {'I', "EmptyQueryResponse"}, - {'K', "BackendKeyData"}, - {'N', "NoticeResponse"}, - {'R', "AuthenticationRequest"}, - {'S', "ParameterStatus"}, - {'T', "RowDescription"}, - {'V', "FunctionCallResponse"}, - {'W', "CopyBothResponse"}, - {'Z', "ReadyForQuery"}, - {'n', "NoData"}, - {'s', "PortalSuspended"}, - {'t', "ParameterDescription"}, - {'v', "NegotiateProtocolVersion"}, - {'c', "CopyDone"}, - {'d', "CopyData"}, - }; - + { + {'1', "ParseComplete"}, + {'2', "BindComplete"}, + {'3', "CloseComplete"}, + {'A', "NotificationResponse"}, + {'C', "CommandComplete"}, + {'D', "DataRow"}, + {'E', "ErrorResponse"}, + {'G', "CopyInResponse"}, + {'H', "CopyOutResponse"}, + {'I', "EmptyQueryResponse"}, + {'K', "BackendKeyData"}, + {'N', "NoticeResponse"}, + {'R', "AuthenticationRequest"}, + {'S', "ParameterStatus"}, + {'T', "RowDescription"}, + {'V', "FunctionCallResponse"}, + {'W', "CopyBothResponse"}, + {'Z', "ReadyForQuery"}, + {'n', "NoData"}, + {'s', "PortalSuspended"}, + {'t', "ParameterDescription"}, + {'v', "NegotiateProtocolVersion"}, + {'c', "CopyDone"}, + {'d', "CopyData"}, + }; + /* store last kind for each backend */ static unsigned char kind_cache[MAX_NUM_BACKENDS]; /* number of repetitions of each kind */ - static int kind_count[MAX_NUM_BACKENDS]; + static int kind_count[MAX_NUM_BACKENDS]; - int kind_num = sizeof(message_label)/sizeof(BackendMessage); - char *label; - static char *last_label; - int i; + int kind_num = sizeof(message_label) / sizeof(BackendMessage); + char *label; + static char *last_label; + int i; /* do nothing if log_backend_messages is disabled */ if (pool_config->log_backend_messages == BGMSG_NONE) @@ -3944,7 +3952,7 @@ void log_backend_messages(unsigned char kind, int backend_id) (errmsg("%s message from backend %d", label, backend_id))); return; } - + /* just to make sure the setting is terse */ if (pool_config->log_backend_messages != BGMSG_TERSE) { @@ -3991,11 +3999,11 @@ void log_backend_messages(unsigned char kind, int backend_id) * All data read in this function is returned to stream. */ char -per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, char *prefix, bool unread) +per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread) { POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id]; char *message; - char kind; + char kind; pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind)); pool_unread(CONNECTION(backend, node_id), &kind, sizeof(kind)); @@ -4020,10 +4028,11 @@ per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, cha * message is not yet parsed on the primary/main node but parsed on other * node. Caller must provide the parse message data as "message". */ -static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - POOL_SENT_MESSAGE * message, - POOL_SENT_MESSAGE * bind_message) +static POOL_STATUS +parse_before_bind(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE *message, + POOL_SENT_MESSAGE *bind_message) { int i; int len = message->len; @@ -4061,8 +4070,8 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend, * Before sending the parse message to the primary, we need to * close the named statement. Otherwise we will get an error from * backend if the named statement already exists. This could - * happen if parse_before_bind is called with a bind message - * using the same named statement. If the named statement does not + * happen if parse_before_bind is called with a bind message using + * the same named statement. If the named statement does not * exist, it's fine. PostgreSQL just ignores a request trying to * close a non-existing statement. If the statement is unnamed * one, we do not need it because unnamed statement can be @@ -4105,8 +4114,10 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend, bind_message->query_context = new_qc; #ifdef NOT_USED + /* - * XXX pool_remove_sent_message() will pfree memory allocated by "contents". + * XXX pool_remove_sent_message() will pfree memory allocated by + * "contents". */ /* Remove old sent message */ @@ -4187,14 +4198,16 @@ static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend, * node. Caller must provide the PREPARED message information as "message" * argument. */ -static POOL_STATUS send_prepare(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend, - POOL_SENT_MESSAGE * message) +static POOL_STATUS +send_prepare(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend, + POOL_SENT_MESSAGE *message) { int node_id; bool backup[MAX_NUM_BACKENDS]; - POOL_QUERY_CONTEXT *qc, *new_qc; - char qbuf[1024]; + POOL_QUERY_CONTEXT *qc, + *new_qc; + char qbuf[1024]; POOL_SELECT_RESULT *res; elog(DEBUG1, "send_prepare called"); @@ -4222,14 +4235,14 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend, } /* - * we are in streaming replication mode and the PREPARE message has - * not been sent to primary yet. + * we are in streaming replication mode and the PREPARE message has not + * been sent to primary yet. */ /* - * Prepare modified query context This is a copy of original PREPARE - * query context except the query sending destination is changed to - * primary node. + * Prepare modified query context This is a copy of original PREPARE query + * context except the query sending destination is changed to primary + * node. */ new_qc = pool_query_context_shallow_copy(qc); memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send)); @@ -4242,8 +4255,8 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend, { /* * Before sending the PREPARE message to the primary, we need to - * DEALLOCATE the named statement. Otherwise we will get an error - * from backend if an identical named statement already exists. + * DEALLOCATE the named statement. Otherwise we will get an error from + * backend if an identical named statement already exists. */ /* check to see if the named statement exists on primary node */ @@ -4256,6 +4269,7 @@ static POOL_STATUS send_prepare(POOL_CONNECTION * frontend, if (res && res->data[0] && strcmp(res->data[0], "0")) { free_select_result(res); + /* * The same named statement exists, We need to send DEALLOCATE * message @@ -4483,7 +4497,7 @@ flatten_set_variable_args(const char *name, List *args) * Wait till ready for query received. */ static void -pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend) +pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend) { char kind; int len; @@ -4531,8 +4545,8 @@ pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend) * is read. */ static void -pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { POOL_PENDING_MESSAGE *pmsg; int i; @@ -4641,7 +4655,7 @@ pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend, * Preconditions: query is in progress. The command is succeeded. */ void -pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend) +pool_at_command_success(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend) { Node *node; char *query; @@ -4769,7 +4783,7 @@ pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backe * read message length (V3 only) */ int -pool_read_message_length(POOL_CONNECTION_POOL * cp) +pool_read_message_length(POOL_CONNECTION_POOL *cp) { int length, length0; @@ -4821,7 +4835,7 @@ pool_read_message_length(POOL_CONNECTION_POOL * cp) * The array is in the static storage, thus it will be destroyed by subsequent calls. */ int * -pool_read_message_length2(POOL_CONNECTION_POOL * cp) +pool_read_message_length2(POOL_CONNECTION_POOL *cp) { int length, length0; @@ -4877,7 +4891,7 @@ pool_read_message_length2(POOL_CONNECTION_POOL * cp) void pool_emit_log_for_message_length_diff(int *length_array, char *name) { - int length0, /* message length of main node id */ + int length0, /* message length of main node id */ length; int i; @@ -4908,7 +4922,7 @@ pool_emit_log_for_message_length_diff(int *length_array, char *name) * Read kind from all valid backend */ signed char -pool_read_kind(POOL_CONNECTION_POOL * cp) +pool_read_kind(POOL_CONNECTION_POOL *cp) { char kind0, kind; @@ -4966,7 +4980,7 @@ pool_read_kind(POOL_CONNECTION_POOL * cp) } int -pool_read_int(POOL_CONNECTION_POOL * cp) +pool_read_int(POOL_CONNECTION_POOL *cp) { int data0, data; @@ -5006,7 +5020,7 @@ pool_read_int(POOL_CONNECTION_POOL * cp) * In case of starting an internal transaction, this should be false. */ static void -si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * node, bool tstate_check) +si_get_snapshot(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, Node *node, bool tstate_check) { POOL_SESSION_CONTEXT *session_context; @@ -5015,13 +5029,12 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node return; /* - * From now on it is possible that query is actually sent to backend. - * So we need to acquire snapshot while there's no committing backend - * in snapshot isolation mode except while processing reset queries. - * For this purpose, we send a query to know whether the transaction - * is READ ONLY or not. Sending actual user's query is not possible - * because it might cause rw-conflict, which in turn causes a - * deadlock. + * From now on it is possible that query is actually sent to backend. So + * we need to acquire snapshot while there's no committing backend in + * snapshot isolation mode except while processing reset queries. For this + * purpose, we send a query to know whether the transaction is READ ONLY + * or not. Sending actual user's query is not possible because it might + * cause rw-conflict, which in turn causes a deadlock. */ if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION && (!tstate_check || (tstate_check && TSTATE(backend, MAIN_NODE_ID) == 'T')) && @@ -5029,16 +5042,17 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node !si_snapshot_prepared() && frontend && frontend->no_forward == 0) { - int i; + int i; si_acquire_snapshot(); for (i = 0; i < NUM_BACKENDS; i++) { - static char *si_query = "SELECT current_setting('transaction_read_only')"; + static char *si_query = "SELECT current_setting('transaction_read_only')"; POOL_SELECT_RESULT *res; - /* We cannot use VALID_BACKEND macro here because load balance + /* + * We cannot use VALID_BACKEND macro here because load balance * node has not been decided yet. */ if (!VALID_BACKEND_RAW(i)) @@ -5066,10 +5080,10 @@ si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * false to caller. */ static bool -check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * frontend, - POOL_CONNECTION_POOL * backend) +check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION *frontend, + POOL_CONNECTION_POOL *backend) { - int len; + int len; if (TSTATE(backend, MAIN_NODE_ID) != 'E') return true; @@ -5087,14 +5101,14 @@ check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * fro /* send an error message to frontend */ pool_send_error_message( - frontend, - MAJOR(backend), - "25P02", - "current transaction is aborted, commands ignored until end of transaction block", - buf.data, - "", - __FILE__, - __LINE__); + frontend, + MAJOR(backend), + "25P02", + "current transaction is aborted, commands ignored until end of transaction block", + buf.data, + "", + __FILE__, + __LINE__); pfree(buf.data); @@ -5117,14 +5131,15 @@ check_transaction_state_and_abort(char *query, Node *node, POOL_CONNECTION * fro * As far as I know this is the most accurate and cheap way. */ static -bool multi_statement_query(char *queries) +bool +multi_statement_query(char *queries) { PsqlScanState sstate; promptStatus_t prompt; PsqlScanResult sr; PQExpBufferData lbuf; - int num_semicolons = 0; - bool done = false; + int num_semicolons = 0; + bool done = false; /* * callback functions for our flex lexer. need this to prevent crash when @@ -5134,9 +5149,9 @@ bool multi_statement_query(char *queries) NULL }; - initPQExpBuffer(&lbuf); /* initialize line buffer */ + initPQExpBuffer(&lbuf); /* initialize line buffer */ - sstate = psql_scan_create(&psqlscan_callbacks); /* create scan state */ + sstate = psql_scan_create(&psqlscan_callbacks); /* create scan state */ /* add the query string to the scan state */ psql_scan_setup(sstate, queries, strlen(queries), 0, true); @@ -5144,9 +5159,9 @@ bool multi_statement_query(char *queries) for (;;) { resetPQExpBuffer(&lbuf); - sr = psql_scan(sstate, &lbuf, &prompt); /* run scanner */ + sr = psql_scan(sstate, &lbuf, &prompt); /* run scanner */ - switch(sr) + switch (sr) { case PSCAN_SEMICOLON: /* found command-ending semicolon */ num_semicolons++; @@ -5154,7 +5169,8 @@ bool multi_statement_query(char *queries) case PSCAN_BACKSLASH: /* found backslash command */ break; case PSCAN_INCOMPLETE: /* end of line, SQL statement incomplete */ - case PSCAN_EOL: /* end of line, SQL possibly complete */ + case PSCAN_EOL: /* end of line, SQL possibly complete */ + /* * If we have already seen ";" and this time something is * transferred into buffer, we assume that the last query is @@ -5193,17 +5209,17 @@ bool multi_statement_query(char *queries) static void check_prepare(List *parse_tree_list, int len, char *contents) { - Node *node; - RawStmt *rstmt; - POOL_QUERY_CONTEXT *query_context; - ListCell *l; - POOL_SENT_MESSAGE *message; + Node *node; + RawStmt *rstmt; + POOL_QUERY_CONTEXT *query_context; + ListCell *l; + POOL_SENT_MESSAGE *message; /* sanity check */ if (list_length(parse_tree_list) <= 1) return; - foreach (l, parse_tree_list) + foreach(l, parse_tree_list) { if (l == list_head(parse_tree_list)) /* skip the first parse tree */ continue; @@ -5214,14 +5230,16 @@ check_prepare(List *parse_tree_list, int len, char *contents) if (!IsA(node, PrepareStmt)) /* PREPARE? */ continue; - query_context = pool_init_query_context(); /* initialize query context */ - query_context->is_multi_statement = true; /* this is a multi statement query */ + query_context = pool_init_query_context(); /* initialize query + * context */ + query_context->is_multi_statement = true; /* this is a multi + * statement query */ pool_start_query(query_context, contents, len, node); /* start query context */ pool_where_to_send(query_context, query_context->original_query, /* set query destination */ query_context->parse_tree); message = pool_create_sent_message('Q', len, contents, 0, /* create sent message */ ((PrepareStmt *) node)->name, query_context); - pool_add_sent_message(message); /* add it to the sent message list */ + pool_add_sent_message(message); /* add it to the sent message list */ } } @@ -5231,12 +5249,13 @@ check_prepare(List *parse_tree_list, int len, char *contents) * set. */ static -POOL_QUERY_CONTEXT *create_dummy_query_context(void) +POOL_QUERY_CONTEXT * +create_dummy_query_context(void) { POOL_QUERY_CONTEXT *query_context; - Node *node; + Node *node; MemoryContext old_context; - char *query = "UNKNOWN QUERY"; + char *query = "UNKNOWN QUERY"; query_context = pool_init_query_context(); old_context = MemoryContextSwitchTo(query_context->memory_context); |